tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject tez git commit: TEZ-2259. Push additional data to Timeline for Recovery for better consumption in UI. (hitesh)
Date Mon, 27 Apr 2015 20:51:26 GMT
Repository: tez
Updated Branches:
  refs/heads/master 21d4e2df0 -> b08ca37e3


TEZ-2259. Push additional data to Timeline for Recovery for better consumption in UI. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b08ca37e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b08ca37e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b08ca37e

Branch: refs/heads/master
Commit: b08ca37e31595dab4941dc7ed0736464d0223bc9
Parents: 21d4e2d
Author: Hitesh Shah <hitesh@apache.org>
Authored: Mon Apr 27 13:50:28 2015 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Mon Apr 27 13:50:28 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/tez/common/ATSConstants.java     |   4 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  22 ++-
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |   5 +-
 .../tez/dag/history/HistoryEventType.java       |   3 +-
 .../dag/history/events/DAGFinishedEvent.java    |  15 ++-
 .../dag/history/events/DAGRecoveredEvent.java   | 124 +++++++++++++++++
 .../impl/HistoryEventJsonConversion.java        |  42 ++++++
 .../apache/tez/dag/app/TestRecoveryParser.java  |   4 +-
 .../tez/dag/app/dag/impl/TestDAGRecovery.java   |   2 +-
 .../TestHistoryEventsProtoConversion.java       |  23 +++-
 .../impl/TestHistoryEventJsonConversion.java    |   7 +-
 .../ats/HistoryEventTimelineConversion.java     |  35 +++++
 .../ats/TestHistoryEventTimelineConversion.java | 133 +++++++++++++++----
 14 files changed, 376 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/b08ca37e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e78ee7e..a5c4a57 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -146,6 +146,7 @@ Release 0.6.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2259. Push additional data to Timeline for Recovery for better consumption in UI.
   TEZ-2365. Update tez-ui war's license/notice to reflect OFL license correctly.
   TEZ-2329. UI Query on final dag status performance improvement
   TEZ-2287. Deprecate VertexManagerPluginContext.getTaskContainer().

http://git-wip-us.apache.org/repos/asf/tez/blob/b08ca37e/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index 26170f4..fd82e20 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -35,12 +35,12 @@ public class ATSConstants {
   public static final String EVENT_INFO = "eventinfo";
   public static final String RELATED_ENTITIES = "relatedEntities";
   public static final String PRIMARY_FILTERS = "primaryfilters";
-  public static final String SECONDARY_FILTERS = "secondaryfilters";
   public static final String OTHER_INFO = "otherinfo";
 
   /* Section for related entities */
   public static final String APPLICATION_ID = "applicationId";
   public static final String APPLICATION_ATTEMPT_ID = "applicationAttemptId";
+  public static final String COMPLETION_APPLICATION_ATTEMPT_ID = "completionApplicationAttemptId";
   public static final String CONTAINER_ID = "containerId";
   public static final String NODE_ID = "nodeId";
   public static final String NODE_HTTP_ADDRESS = "nodeHttpAddress";
@@ -52,6 +52,8 @@ public class ATSConstants {
   /* Tez-specific info */
   public static final String DAG_PLAN = "dagPlan";
   public static final String DAG_NAME = "dagName";
+  public static final String DAG_STATE = "dagState";
+  public static final String RECOVERY_FAILURE_REASON = "recoveryFailureReason";
   public static final String VERTEX_NAME = "vertexName";
   public static final String VERTEX_NAME_ID_MAPPING = "vertexNameIdMapping";
   public static final String SCHEDULED_TIME = "scheduledTime";

http://git-wip-us.apache.org/repos/asf/tez/blob/b08ca37e/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 8a914f6..90935ac 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -58,10 +58,9 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.Options;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDagCleanup;
+import org.apache.tez.dag.history.events.DAGRecoveredEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -162,6 +161,8 @@ import org.apache.tez.dag.utils.Graph;
 import org.apache.tez.dag.utils.RelocalizationUtils;
 import org.apache.tez.dag.utils.Simple2LevelVersionComparator;
 import org.codehaus.jettison.json.JSONException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
@@ -1736,18 +1737,35 @@ public class DAGAppMaster extends AbstractService {
           DAGEventRecoverEvent recoverDAGEvent =
               new DAGEventRecoverEvent(recoveredDAGData.recoveredDAG.getID(),
                   DAGState.FAILED, classpathUrls);
+          DAGRecoveredEvent dagRecoveredEvent = new DAGRecoveredEvent(this.appAttemptID,
+              recoveredDAGData.recoveredDAG.getID(), recoveredDAGData.recoveredDAG.getName(),
+              recoveredDAGData.recoveredDAG.getUserName(),
+              this.clock.getTime(), DAGState.FAILED, recoveredDAGData.reason);
+          this.historyEventHandler.handle(new DAGHistoryEvent(recoveredDAGData.recoveredDAG.getID(),
+              dagRecoveredEvent));
           dagEventDispatcher.handle(recoverDAGEvent);
           this.state = DAGAppMasterState.RUNNING;
         } else {
           DAGEventRecoverEvent recoverDAGEvent =
               new DAGEventRecoverEvent(recoveredDAGData.recoveredDAG.getID(),
                   recoveredDAGData.dagState, classpathUrls);
+          DAGRecoveredEvent dagRecoveredEvent = new DAGRecoveredEvent(this.appAttemptID,
+              recoveredDAGData.recoveredDAG.getID(), recoveredDAGData.recoveredDAG.getName(),
+              recoveredDAGData.recoveredDAG.getUserName(), this.clock.getTime(),
+              recoveredDAGData.dagState, null);
+          this.historyEventHandler.handle(new DAGHistoryEvent(recoveredDAGData.recoveredDAG.getID(),
+              dagRecoveredEvent));
           dagEventDispatcher.handle(recoverDAGEvent);
           this.state = DAGAppMasterState.RUNNING;
         }
       } else {
         LOG.info("Found DAG to recover, dagId=" + recoveredDAGData.recoveredDAG.getID());
         _updateLoggers(recoveredDAGData.recoveredDAG, "");
+        DAGRecoveredEvent dagRecoveredEvent = new DAGRecoveredEvent(this.appAttemptID,
+            recoveredDAGData.recoveredDAG.getID(), recoveredDAGData.recoveredDAG.getName(),
+            recoveredDAGData.recoveredDAG.getUserName(), this.clock.getTime());
+        this.historyEventHandler.handle(new DAGHistoryEvent(recoveredDAGData.recoveredDAG.getID(),
+            dagRecoveredEvent));
         DAGEventRecoverEvent recoverDAGEvent = new DAGEventRecoverEvent(
             recoveredDAGData.recoveredDAG.getID(), classpathUrls);
         dagEventDispatcher.handle(recoverDAGEvent);

http://git-wip-us.apache.org/repos/asf/tez/blob/b08ca37e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 5540285..c47a0d7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -1127,7 +1127,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     Map<String, Integer> taskStats = constructTaskStats(getDAGProgress());
     DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
         finishTime, DAGState.SUCCEEDED, "", getAllCounters(),
-        this.userName, this.dagName, taskStats);
+        this.userName, this.dagName, taskStats, this.appContext.getApplicationAttemptId());
     this.appContext.getHistoryHandler().handleCriticalEvent(
         new DAGHistoryEvent(dagId, finishEvt));
   }
@@ -1151,7 +1151,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
         clock.getTime(), state,
         StringUtils.join(getDiagnostics(), LINE_SEPARATOR),
-        getAllCounters(), this.userName, this.dagName, taskStats);
+        getAllCounters(), this.userName, this.dagName, taskStats,
+        this.appContext.getApplicationAttemptId());
     this.appContext.getHistoryHandler().handleCriticalEvent(
         new DAGHistoryEvent(dagId, finishEvt));
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/b08ca37e/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
index 17df58f..6949d21 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
@@ -40,5 +40,6 @@ public enum HistoryEventType {
   DAG_COMMIT_STARTED,
   VERTEX_COMMIT_STARTED,
   VERTEX_GROUP_COMMIT_STARTED,
-  VERTEX_GROUP_COMMIT_FINISHED
+  VERTEX_GROUP_COMMIT_FINISHED,
+  DAG_RECOVERED
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/b08ca37e/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
index b10a876..2f173a9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
@@ -23,8 +23,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Map;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.app.dag.DAGState;
@@ -41,8 +40,6 @@ import com.google.protobuf.ByteString;
 
 public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
 
-  private static final Logger LOG = LoggerFactory.getLogger(DAGFinishedEvent.class);
-
   private TezDAGID dagID;
   private long startTime;
   private long finishTime;
@@ -53,13 +50,16 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
   private String dagName;
   Map<String, Integer> dagTaskStats;
 
+  private ApplicationAttemptId applicationAttemptId;
+
   public DAGFinishedEvent() {
   }
 
   public DAGFinishedEvent(TezDAGID dagId, long startTime,
       long finishTime, DAGState state,
       String diagnostics, TezCounters counters,
-      String user, String dagName, Map<String, Integer> dagTaskStats) {
+      String user, String dagName, Map<String, Integer> dagTaskStats,
+      ApplicationAttemptId applicationAttemptId) {
     this.dagID = dagId;
     this.startTime = startTime;
     this.finishTime = finishTime;
@@ -69,6 +69,7 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
     this.user = user;
     this.dagName = dagName;
     this.dagTaskStats = dagTaskStats;
+    this.applicationAttemptId = applicationAttemptId;
   }
 
   @Override
@@ -202,4 +203,8 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
     return dagTaskStats;
   }
 
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return applicationAttemptId;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/b08ca37e/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java
new file mode 100644
index 0000000..5b44de2
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.records.TezDAGID;
+
+public class DAGRecoveredEvent implements HistoryEvent {
+
+  private final ApplicationAttemptId applicationAttemptId;
+  private final TezDAGID dagID;
+  private final long recoveredTime;
+  private final DAGState recoveredDagState;
+  private final String recoveryFailureReason;
+  private final String dagName;
+  private final String user;
+
+  public DAGRecoveredEvent(ApplicationAttemptId applicationAttemptId,
+      TezDAGID dagId, String dagName, String user,
+      long recoveredTime, DAGState recoveredState,
+      String recoveryFailureReason) {
+    this.applicationAttemptId = applicationAttemptId;
+    this.dagID = dagId;
+    this.dagName = dagName;
+    this.user = user;
+    this.recoveredTime = recoveredTime;
+    this.recoveredDagState = recoveredState;
+    this.recoveryFailureReason = recoveryFailureReason;
+  }
+
+  public DAGRecoveredEvent(ApplicationAttemptId applicationAttemptId,
+      TezDAGID dagId, String dagName, String user, long recoveredTime) {
+    this(applicationAttemptId, dagId, dagName, user, recoveredTime, null, null);
+  }
+
+  @Override
+  public HistoryEventType getEventType() {
+    return HistoryEventType.DAG_RECOVERED;
+  }
+
+  @Override
+  public boolean isRecoveryEvent() {
+    return false;
+  }
+
+  @Override
+  public boolean isHistoryEvent() {
+    return true;
+  }
+
+  @Override
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    throw new UnsupportedOperationException("Invalid operation for eventType "
+        + getEventType().name());
+  }
+
+  @Override
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    throw new UnsupportedOperationException("Invalid operation for eventType "
+        + getEventType().name());
+  }
+
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return applicationAttemptId;
+  }
+
+  public TezDAGID getDagID() {
+    return dagID;
+  }
+
+  public long getRecoveredTime() {
+    return recoveredTime;
+  }
+
+  public DAGState getRecoveredDagState() {
+    return recoveredDagState;
+  }
+
+  public String getRecoveryFailureReason() {
+    return recoveryFailureReason;
+  }
+
+  public String getDagName() {
+    return dagName;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  @Override
+  public String toString() {
+    return "applicationAttemptId="
+        + (applicationAttemptId != null ? applicationAttemptId.toString() : "null")
+        + ", dagId=" + (dagID != null ? dagID.toString() : "null")
+        + ", recoveredTime=" + recoveredTime
+        + ", recoveredState=" + (recoveredDagState != null ? recoveredDagState.name() : "null"
)
+        + ", recoveryFailureReason=" + recoveryFailureReason;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/b08ca37e/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index 22d95d8..07ce2f3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -33,6 +33,7 @@ import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
 import org.apache.tez.dag.history.events.ContainerStoppedEvent;
 import org.apache.tez.dag.history.events.DAGFinishedEvent;
 import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGRecoveredEvent;
 import org.apache.tez.dag.history.events.DAGStartedEvent;
 import org.apache.tez.dag.history.events.DAGSubmittedEvent;
 import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
@@ -110,6 +111,9 @@ public class HistoryEventJsonConversion {
       case VERTEX_PARALLELISM_UPDATED:
         jsonObject = convertVertexParallelismUpdatedEvent((VertexParallelismUpdatedEvent)
historyEvent);
         break;
+      case DAG_RECOVERED:
+        jsonObject = convertDAGRecoveredEvent((DAGRecoveredEvent) historyEvent);
+        break;
       case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
       case VERTEX_COMMIT_STARTED:
       case VERTEX_GROUP_COMMIT_STARTED:
@@ -124,6 +128,42 @@ public class HistoryEventJsonConversion {
     return jsonObject;
   }
 
+  private static JSONObject convertDAGRecoveredEvent(DAGRecoveredEvent event)
+      throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY,
+        event.getDagID().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_DAG_ID.name());
+
+    // Related Entities not needed as should have been done in
+    // dag submission event
+
+    JSONArray events = new JSONArray();
+    JSONObject recoverEvent = new JSONObject();
+    recoverEvent.put(ATSConstants.TIMESTAMP, event.getRecoveredTime());
+    recoverEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.DAG_RECOVERED.name());
+
+    JSONObject recoverEventInfo = new JSONObject();
+    recoverEventInfo.put(ATSConstants.APPLICATION_ATTEMPT_ID,
+        event.getApplicationAttemptId().toString());
+    if (event.getRecoveredDagState() != null) {
+      recoverEventInfo.put(ATSConstants.DAG_STATE, event.getRecoveredDagState().name());
+    }
+    if (event.getRecoveryFailureReason() != null) {
+      recoverEventInfo.put(ATSConstants.RECOVERY_FAILURE_REASON,
+          event.getRecoveryFailureReason());
+    }
+
+    recoverEvent.put(ATSConstants.EVENT_INFO, recoverEventInfo);
+    events.put(recoverEvent);
+
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    return jsonObject;
+  }
+
   private static JSONObject convertAppLaunchedEvent(AppLaunchedEvent event) throws JSONException
{
     JSONObject jsonObject = new JSONObject();
     jsonObject.put(ATSConstants.ENTITY,
@@ -327,6 +367,8 @@ public class HistoryEventJsonConversion {
     otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
     otherInfo.put(ATSConstants.COUNTERS,
         DAGUtils.convertCountersToJSON(event.getTezCounters()));
+    otherInfo.put(ATSConstants.COMPLETION_APPLICATION_ATTEMPT_ID,
+        event.getApplicationAttemptId().toString());
 
     final Map<String, Integer> dagTaskStats = event.getDagTaskStats();
     if (dagTaskStats != null) {

http://git-wip-us.apache.org/repos/asf/tez/blob/b08ca37e/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
index a256244..4bb0615 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
@@ -169,6 +169,7 @@ public class TestRecoveryParser {
   // skipAllOtherEvents due to dag finished
   @Test (timeout = 5000)
   public void testSkipAllOtherEvents_2() throws IOException {
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
     ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
     TezDAGID dagID = TezDAGID.getInstance(appId, 1);
     AppContext appContext = mock(AppContext.class);
@@ -188,7 +189,8 @@ public class TestRecoveryParser {
     rService.handle(new DAGHistoryEvent(dagID,
         new DAGInitializedEvent(dagID, 1L, "user", dagPlan.getName(), null)));
     rService.handle(new DAGHistoryEvent(dagID,
-        new DAGFinishedEvent(dagID, 1L, 2L, DAGState.FAILED, "diag", null, "user", "dag1",
null)));
+        new DAGFinishedEvent(dagID, 1L, 2L, DAGState.FAILED, "diag", null, "user", "dag1",
null,
+            appAttemptId)));
     rService.handle(new DAGHistoryEvent(dagID, new DAGStartedEvent(dagID, 1L, "user", "dag1")));
     rService.stop();
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b08ca37e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
index 58c55c2..bd4653b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -144,7 +144,7 @@ public class TestDAGRecovery {
   private void restoreFromDAGFinishedEvent(DAGState finalState) {
     DAGState recoveredState =
         dag.restoreFromEvent(new DAGFinishedEvent(dagId, startTime, finishTime,
-            finalState, "", tezCounters, user, dagName, null));
+            finalState, "", tezCounters, user, dagName, null, null));
     assertEquals(finishTime, dag.finishTime);
     assertFalse(dag.recoveryCommitInProgress);
     assertEquals(finalState, recoveredState);

http://git-wip-us.apache.org/repos/asf/tez/blob/b08ca37e/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index bf61ff0..302700c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -217,7 +217,7 @@ public class TestHistoryEventsProtoConversion {
     {
       DAGFinishedEvent event = new DAGFinishedEvent(
           TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1000l, 20000l,
-          DAGState.FAILED, null, null, "user", "dagName", null);
+          DAGState.FAILED, null, null, "user", "dagName", null, null);
       DAGFinishedEvent deserializedEvent = (DAGFinishedEvent)
           testProtoConversion(event);
       Assert.assertEquals(
@@ -238,7 +238,7 @@ public class TestHistoryEventsProtoConversion {
       DAGFinishedEvent event = new DAGFinishedEvent(
           TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1000l, 20000l,
           DAGState.FAILED, "bad diagnostics", tezCounters,
-          "user", "dagName", null);
+          "user", "dagName", null, null);
       DAGFinishedEvent deserializedEvent = (DAGFinishedEvent)
           testProtoConversion(event);
       Assert.assertEquals(
@@ -717,10 +717,29 @@ public class TestHistoryEventsProtoConversion {
         case VERTEX_GROUP_COMMIT_FINISHED:
           testVertexGroupCommitFinishedEvent();
           break;
+        case DAG_RECOVERED:
+          testDAGRecoveredEvent();
+          break;
         default:
           throw new Exception("Unhandled Event type in Unit tests: " + eventType);
         }
       }
     }
 
+  private void testDAGRecoveredEvent() {
+    DAGRecoveredEvent dagRecoveredEvent = new DAGRecoveredEvent(
+        ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1),
+        TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1),
+        "mockDagname", "mockuser", 100334l);
+    try {
+      testProtoConversion(dagRecoveredEvent);
+      Assert.fail("Proto conversion should have failed");
+    } catch (UnsupportedOperationException e) {
+      // Expected
+    } catch (IOException e) {
+      Assert.fail("Proto conversion should have failed with Unsupported Exception");
+    }
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/b08ca37e/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index bbf29e3..c6749af 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -50,6 +50,7 @@ import org.apache.tez.dag.history.events.ContainerStoppedEvent;
 import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
 import org.apache.tez.dag.history.events.DAGFinishedEvent;
 import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGRecoveredEvent;
 import org.apache.tez.dag.history.events.DAGStartedEvent;
 import org.apache.tez.dag.history.events.DAGSubmittedEvent;
 import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
@@ -132,7 +133,7 @@ public class TestHistoryEventJsonConversion {
           break;
         case DAG_FINISHED:
           event = new DAGFinishedEvent(tezDAGID, random.nextInt(), random.nextInt(), DAGState.ERROR,
-              null, null, user, dagPlan.getName(), null);
+              null, null, user, dagPlan.getName(), null, applicationAttemptId);
           break;
         case VERTEX_INITIALIZED:
           event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
@@ -186,6 +187,10 @@ public class TestHistoryEventJsonConversion {
         case VERTEX_GROUP_COMMIT_FINISHED:
           event = new VertexGroupCommitFinishedEvent();
           break;
+        case DAG_RECOVERED:
+          event = new DAGRecoveredEvent(applicationAttemptId, tezDAGID, dagPlan.getName(),
user,
+              1l);
+          break;
         default:
           Assert.fail("Unhandled event type " + eventType);
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/b08ca37e/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index fdd8f19..7c804f5 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -40,6 +40,7 @@ import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
 import org.apache.tez.dag.history.events.ContainerStoppedEvent;
 import org.apache.tez.dag.history.events.DAGFinishedEvent;
 import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGRecoveredEvent;
 import org.apache.tez.dag.history.events.DAGStartedEvent;
 import org.apache.tez.dag.history.events.DAGSubmittedEvent;
 import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
@@ -115,6 +116,10 @@ public class HistoryEventTimelineConversion {
         timelineEntity = convertVertexParallelismUpdatedEvent(
             (VertexParallelismUpdatedEvent) historyEvent);
         break;
+      case DAG_RECOVERED:
+        timelineEntity = convertDAGRecoveredEvent(
+            (DAGRecoveredEvent) historyEvent);
+        break;
       case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
       case VERTEX_COMMIT_STARTED:
       case VERTEX_GROUP_COMMIT_STARTED:
@@ -129,6 +134,34 @@ public class HistoryEventTimelineConversion {
     return timelineEntity;
   }
 
+  private static TimelineEntity convertDAGRecoveredEvent(DAGRecoveredEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getDagID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name());
+
+    TimelineEvent recoverEvt = new TimelineEvent();
+    recoverEvt.setEventType(HistoryEventType.DAG_RECOVERED.name());
+    recoverEvt.setTimestamp(event.getRecoveredTime());
+    recoverEvt.addEventInfo(ATSConstants.APPLICATION_ATTEMPT_ID,
+        event.getApplicationAttemptId().toString());
+    if (event.getRecoveredDagState() != null) {
+      recoverEvt.addEventInfo(ATSConstants.DAG_STATE, event.getRecoveredDagState().name());
+    }
+    if (event.getRecoveryFailureReason() != null) {
+      recoverEvt.addEventInfo(ATSConstants.RECOVERY_FAILURE_REASON,
+          event.getRecoveryFailureReason());
+    }
+
+    atsEntity.addEvent(recoverEvt);
+
+    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getApplicationAttemptId().getApplicationId().toString());
+    atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName());
+
+    return atsEntity;
+  }
+
   private static TimelineEntity convertAppLaunchedEvent(AppLaunchedEvent event) {
     TimelineEntity atsEntity = new TimelineEntity();
     atsEntity.setEntityId("tez_"
@@ -270,6 +303,8 @@ public class HistoryEventTimelineConversion {
     atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
     atsEntity.addOtherInfo(ATSConstants.COUNTERS,
         DAGUtils.convertCountersToATSMap(event.getTezCounters()));
+    atsEntity.addOtherInfo(ATSConstants.COMPLETION_APPLICATION_ATTEMPT_ID,
+        event.getApplicationAttemptId().toString());
 
     final Map<String, Integer> dagTaskStats = event.getDagTaskStats();
     if (dagTaskStats != null) {

http://git-wip-us.apache.org/repos/asf/tez/blob/b08ca37e/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index 14330ba..3d2b662 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -56,6 +56,7 @@ import org.apache.tez.dag.history.events.ContainerStoppedEvent;
 import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
 import org.apache.tez.dag.history.events.DAGFinishedEvent;
 import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGRecoveredEvent;
 import org.apache.tez.dag.history.events.DAGStartedEvent;
 import org.apache.tez.dag.history.events.DAGSubmittedEvent;
 import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
@@ -137,7 +138,7 @@ public class TestHistoryEventTimelineConversion {
           break;
         case DAG_FINISHED:
           event = new DAGFinishedEvent(tezDAGID, random.nextInt(), random.nextInt(), DAGState.ERROR,
-              null, null, user, dagPlan.getName(), null);
+              null, null, user, dagPlan.getName(), null, applicationAttemptId);
           break;
         case VERTEX_INITIALIZED:
           event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
@@ -191,6 +192,10 @@ public class TestHistoryEventTimelineConversion {
         case VERTEX_GROUP_COMMIT_FINISHED:
           event = new VertexGroupCommitFinishedEvent();
           break;
+        case DAG_RECOVERED:
+          event = new DAGRecoveredEvent(applicationAttemptId, tezDAGID, dagPlan.getName(),
+              user, random.nextLong());
+          break;
         default:
           Assert.fail("Unhandled event type " + eventType);
       }
@@ -238,15 +243,15 @@ public class TestHistoryEventTimelineConversion {
     Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.TEZ_VERSION));
     Assert.assertEquals(user, timelineEntity.getOtherInfo().get(ATSConstants.USER));
     Assert.assertEquals(applicationId.toString(),
-           timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID));
+        timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID));
 
     Map<String, String> config =
-        (Map<String, String>)timelineEntity.getOtherInfo().get(ATSConstants.CONFIG);
+        (Map<String, String>) timelineEntity.getOtherInfo().get(ATSConstants.CONFIG);
     Assert.assertEquals(conf.get("foo"), config.get("foo"));
     Assert.assertEquals(conf.get("applicationId"), config.get("applicationId"));
 
     Map<String, String> versionInfo =
-        (Map<String, String>)timelineEntity.getOtherInfo().get(ATSConstants.TEZ_VERSION);
+        (Map<String, String>) timelineEntity.getOtherInfo().get(ATSConstants.TEZ_VERSION);
     Assert.assertEquals(mockVersionInfo.getVersion(),
         versionInfo.get(ATSConstants.VERSION));
     Assert.assertEquals(mockVersionInfo.getRevision(),
@@ -310,7 +315,7 @@ public class TestHistoryEventTimelineConversion {
     Assert.assertTrue(timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
         applicationAttemptId.getApplicationId().toString()));
 
-    Assert.assertEquals(containerId.toString(),timelineEntity.getOtherInfo().get(ATSConstants.CONTAINER_ID));
+    Assert.assertEquals(containerId.toString(), timelineEntity.getOtherInfo().get(ATSConstants.CONTAINER_ID));
 
     Assert.assertEquals(launchTime, timelineEntity.getStartTime().longValue());
 
@@ -381,6 +386,7 @@ public class TestHistoryEventTimelineConversion {
     Assert.assertEquals(startTime, otherInfo.get(ATSConstants.START_TIME));
     Assert.assertEquals(DAGState.RUNNING.name(), otherInfo.get(ATSConstants.STATUS));
   }
+
   @Test(timeout = 5000)
   public void testConvertDAGSubmittedEvent() {
     long submitTime = random.nextLong();
@@ -426,11 +432,11 @@ public class TestHistoryEventTimelineConversion {
     Assert.assertEquals(applicationAttemptId.getApplicationId().toString(),
         timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID));
     Assert.assertEquals(user,
-           timelineEntity.getOtherInfo().get(ATSConstants.USER));
+        timelineEntity.getOtherInfo().get(ATSConstants.USER));
   }
 
   @Test(timeout = 5000)
-  public void testConvertTaskAttemptFinishedEvent(){
+  public void testConvertTaskAttemptFinishedEvent() {
     String vertexName = "testVertex";
     long startTime = random.nextLong();
     long finishTime = startTime + 1234;
@@ -478,7 +484,7 @@ public class TestHistoryEventTimelineConversion {
   public void testConvertDAGInitializedEvent() {
     long initTime = random.nextLong();
 
-    Map<String,TezVertexID> nameIdMap = new HashMap<String, TezVertexID>();
+    Map<String, TezVertexID> nameIdMap = new HashMap<String, TezVertexID>();
     nameIdMap.put("foo", tezVertexID);
 
     DAGInitializedEvent event = new DAGInitializedEvent(tezDAGID, initTime, "user", "dagName",
@@ -518,12 +524,12 @@ public class TestHistoryEventTimelineConversion {
   public void testConvertDAGFinishedEvent() {
     long finishTime = random.nextLong();
     long startTime = random.nextLong();
-    Map<String,Integer> taskStats = new HashMap<String, Integer>();
+    Map<String, Integer> taskStats = new HashMap<String, Integer>();
     taskStats.put("FOO", 100);
     taskStats.put("BAR", 200);
 
     DAGFinishedEvent event = new DAGFinishedEvent(tezDAGID, startTime, finishTime, DAGState.ERROR,
-        "diagnostics", null, user, dagPlan.getName(), taskStats);
+        "diagnostics", null, user, dagPlan.getName(), taskStats, applicationAttemptId);
 
     TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
     Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
@@ -549,21 +555,23 @@ public class TestHistoryEventTimelineConversion {
             DAGState.ERROR.name()));
 
     Assert.assertEquals(startTime,
-        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.START_TIME)).longValue());
+        ((Long) timelineEntity.getOtherInfo().get(ATSConstants.START_TIME)).longValue());
     Assert.assertEquals(finishTime,
-        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.FINISH_TIME)).longValue());
+        ((Long) timelineEntity.getOtherInfo().get(ATSConstants.FINISH_TIME)).longValue());
     Assert.assertEquals(finishTime - startTime,
-        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.TIME_TAKEN)).longValue());
+        ((Long) timelineEntity.getOtherInfo().get(ATSConstants.TIME_TAKEN)).longValue());
     Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.COUNTERS));
     Assert.assertEquals(DAGState.ERROR.name(),
         timelineEntity.getOtherInfo().get(ATSConstants.STATUS));
     Assert.assertEquals("diagnostics",
         timelineEntity.getOtherInfo().get(ATSConstants.DIAGNOSTICS));
+    Assert.assertEquals(applicationAttemptId.toString(),
+        timelineEntity.getOtherInfo().get(ATSConstants.COMPLETION_APPLICATION_ATTEMPT_ID));
 
     Assert.assertEquals(100,
-        ((Integer)timelineEntity.getOtherInfo().get("FOO")).intValue());
+        ((Integer) timelineEntity.getOtherInfo().get("FOO")).intValue());
     Assert.assertEquals(200,
-        ((Integer)timelineEntity.getOtherInfo().get("BAR")).intValue());
+        ((Integer) timelineEntity.getOtherInfo().get("BAR")).intValue());
   }
 
   @Test(timeout = 5000)
@@ -602,13 +610,13 @@ public class TestHistoryEventTimelineConversion {
     Assert.assertEquals("proc", timelineEntity.getOtherInfo().get(ATSConstants.PROCESSOR_CLASS_NAME));
 
     Assert.assertEquals(initedTime,
-        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.INIT_TIME)).longValue());
+        ((Long) timelineEntity.getOtherInfo().get(ATSConstants.INIT_TIME)).longValue());
     Assert.assertEquals(initRequestedTime,
-        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.INIT_REQUESTED_TIME)).longValue());
+        ((Long) timelineEntity.getOtherInfo().get(ATSConstants.INIT_REQUESTED_TIME)).longValue());
     Assert.assertEquals(initedTime,
-        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.INIT_TIME)).longValue());
+        ((Long) timelineEntity.getOtherInfo().get(ATSConstants.INIT_TIME)).longValue());
     Assert.assertEquals(numTasks,
-        ((Integer)timelineEntity.getOtherInfo().get(ATSConstants.NUM_TASKS)).intValue());
+        ((Integer) timelineEntity.getOtherInfo().get(ATSConstants.NUM_TASKS)).intValue());
   }
 
   @Test(timeout = 5000)
@@ -618,12 +626,12 @@ public class TestHistoryEventTimelineConversion {
     long startRequestedTime = random.nextLong();
     long startTime = random.nextLong();
     long finishTime = random.nextLong();
-    Map<String,Integer> taskStats = new HashMap<String, Integer>();
+    Map<String, Integer> taskStats = new HashMap<String, Integer>();
     taskStats.put("FOO", 100);
     taskStats.put("BAR", 200);
     VertexStats vertexStats = new VertexStats();
 
-    VertexFinishedEvent event = new VertexFinishedEvent(tezVertexID, "v1", 1,initRequestedTime,
+    VertexFinishedEvent event = new VertexFinishedEvent(tezVertexID, "v1", 1, initRequestedTime,
         initedTime, startRequestedTime, startTime, finishTime, VertexState.ERROR,
         "diagnostics", null, vertexStats, taskStats);
 
@@ -649,9 +657,9 @@ public class TestHistoryEventTimelineConversion {
     Assert.assertEquals(finishTime, timelineEvent.getTimestamp());
 
     Assert.assertEquals(finishTime,
-        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.FINISH_TIME)).longValue());
+        ((Long) timelineEntity.getOtherInfo().get(ATSConstants.FINISH_TIME)).longValue());
     Assert.assertEquals(finishTime - startTime,
-        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.TIME_TAKEN)).longValue());
+        ((Long) timelineEntity.getOtherInfo().get(ATSConstants.TIME_TAKEN)).longValue());
     Assert.assertEquals(VertexState.ERROR.name(),
         timelineEntity.getOtherInfo().get(ATSConstants.STATUS));
     Assert.assertEquals("diagnostics",
@@ -660,9 +668,9 @@ public class TestHistoryEventTimelineConversion {
     Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.STATS));
 
     Assert.assertEquals(100,
-        ((Integer)timelineEntity.getOtherInfo().get("FOO")).intValue());
+        ((Integer) timelineEntity.getOtherInfo().get("FOO")).intValue());
     Assert.assertEquals(200,
-        ((Integer)timelineEntity.getOtherInfo().get("BAR")).intValue());
+        ((Integer) timelineEntity.getOtherInfo().get("BAR")).intValue());
   }
 
   @Test(timeout = 5000)
@@ -702,9 +710,9 @@ public class TestHistoryEventTimelineConversion {
     Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.START_TIME));
 
     Assert.assertEquals(scheduleTime,
-        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.SCHEDULED_TIME)).longValue());
+        ((Long) timelineEntity.getOtherInfo().get(ATSConstants.SCHEDULED_TIME)).longValue());
     Assert.assertEquals(startTime,
-        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.START_TIME)).longValue());
+        ((Long) timelineEntity.getOtherInfo().get(ATSConstants.START_TIME)).longValue());
     Assert.assertTrue(TaskState.SCHEDULED.name()
         .equals(timelineEntity.getOtherInfo().get(ATSConstants.STATUS)));
   }
@@ -847,5 +855,74 @@ public class TestHistoryEventTimelineConversion {
 
   }
 
+  @Test(timeout = 5000)
+  public void testConvertDAGRecoveredEvent() {
+    long recoverTime = random.nextLong();
+
+    DAGRecoveredEvent event = new DAGRecoveredEvent(applicationAttemptId, tezDAGID,
+        dagPlan.getName(), user, recoverTime);
+
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
+    Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
+
+    Assert.assertEquals(0, timelineEntity.getRelatedEntities().size());
+
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+    TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+    Assert.assertEquals(HistoryEventType.DAG_RECOVERED.name(), timelineEvent.getEventType());
+    Assert.assertEquals(recoverTime, timelineEvent.getTimestamp());
+
+    Assert.assertTrue(timelineEvent.getEventInfo().containsKey(ATSConstants.APPLICATION_ATTEMPT_ID));
+    Assert.assertEquals(applicationAttemptId.toString(),
+        timelineEvent.getEventInfo().get(ATSConstants.APPLICATION_ATTEMPT_ID));
+
+    Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size());
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
+            applicationId.toString()));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains("DAGPlanMock"));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertDAGRecoveredEvent2() {
+    long recoverTime = random.nextLong();
+
+    DAGRecoveredEvent event = new DAGRecoveredEvent(applicationAttemptId, tezDAGID,
+        dagPlan.getName(), user, recoverTime, DAGState.ERROR, "mock reason");
+
+
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
+    Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
+
+    Assert.assertEquals(0, timelineEntity.getRelatedEntities().size());
+
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+    TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+    Assert.assertEquals(HistoryEventType.DAG_RECOVERED.name(), timelineEvent.getEventType());
+    Assert.assertEquals(recoverTime, timelineEvent.getTimestamp());
+
+    Assert.assertTrue(timelineEvent.getEventInfo().containsKey(ATSConstants.APPLICATION_ATTEMPT_ID));
+    Assert.assertEquals(applicationAttemptId.toString(),
+        timelineEvent.getEventInfo().get(ATSConstants.APPLICATION_ATTEMPT_ID));
+    Assert.assertEquals(DAGState.ERROR.name(),
+        timelineEvent.getEventInfo().get(ATSConstants.DAG_STATE));
+    Assert.assertEquals("mock reason",
+        timelineEvent.getEventInfo().get(ATSConstants.RECOVERY_FAILURE_REASON));
+
+    Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size());
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
+            applicationId.toString()));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains("DAGPlanMock"));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
+  }
+
 
-}
+}
\ No newline at end of file


Mime
View raw message