tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject [3/3] git commit: TEZ-799. Generate data to be used for recovery. (hitesh)
Date Fri, 07 Feb 2014 04:05:33 GMT
TEZ-799. Generate data to be used for recovery. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/14127af1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/14127af1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/14127af1

Branch: refs/heads/master
Commit: 14127af127f709c2a8ae58d6e32c5b84f16a6786
Parents: 38e545b
Author: Hitesh Shah <hitesh@apache.org>
Authored: Thu Feb 6 20:04:54 2014 -0800
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Thu Feb 6 20:04:54 2014 -0800

----------------------------------------------------------------------
 pom.xml                                         |   5 +
 .../apache/tez/dag/api/TezConfiguration.java    |  21 ++
 .../org/apache/tez/dag/records/TezDAGID.java    |   4 +-
 .../tez/dag/records/TezTaskAttemptID.java       |  34 ++-
 .../org/apache/tez/dag/records/TezTaskID.java   |  24 +-
 .../org/apache/tez/dag/records/TezVertexID.java |  21 +-
 tez-dag/pom.xml                                 |  26 +-
 tez-dag/src/main/avro/HistoryEvents.avpr        | 191 -------------
 .../java/org/apache/tez/dag/app/AppContext.java |   9 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 103 ++++++-
 .../apache/tez/dag/app/DAGAppMasterState.java   |   1 +
 .../java/org/apache/tez/dag/app/dag/Vertex.java |   4 +
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  28 +-
 .../app/dag/impl/RootInputVertexManager.java    |  16 ++
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |   9 +-
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |   9 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  53 +++-
 .../dag/app/launcher/ContainerLauncherImpl.java |   5 +-
 .../apache/tez/dag/history/DAGHistoryEvent.java |  16 +-
 .../apache/tez/dag/history/HistoryEvent.java    |  16 +-
 .../tez/dag/history/HistoryEventHandler.java    |  66 ++++-
 .../tez/dag/history/HistoryEventType.java       |  37 +++
 .../apache/tez/dag/history/SummaryEvent.java    |  28 ++
 .../apache/tez/dag/history/ats/ATSService.java  | 131 +++++++++
 .../apache/tez/dag/history/ats/EntityTypes.java |  28 ++
 .../tez/dag/history/events/AMLaunchedEvent.java | 142 ++++++++++
 .../tez/dag/history/events/AMStartedEvent.java  | 105 ++++++--
 .../tez/dag/history/events/AvroUtils.java       |  53 ----
 .../history/events/ContainerLaunchedEvent.java  | 109 +++++++-
 .../dag/history/events/DAGFinishedEvent.java    | 135 ++++++++--
 .../dag/history/events/DAGInitializedEvent.java |  97 +++++++
 .../tez/dag/history/events/DAGStartedEvent.java |  89 +++++--
 .../dag/history/events/DAGSubmittedEvent.java   | 182 +++++++++++++
 .../events/TaskAttemptFinishedEvent.java        | 124 +++++++--
 .../history/events/TaskAttemptStartedEvent.java | 128 +++++++--
 .../dag/history/events/TaskFinishedEvent.java   | 119 +++++++--
 .../dag/history/events/TaskStartedEvent.java    | 108 ++++++--
 .../VertexDataMovementEventsGeneratedEvent.java | 209 +++++++++++++++
 .../dag/history/events/VertexFinishedEvent.java | 138 ++++++++--
 .../history/events/VertexInitializedEvent.java  | 149 +++++++++++
 .../dag/history/events/VertexStartedEvent.java  | 113 ++++++--
 .../dag/history/recovery/RecoveryService.java   | 267 +++++++++++++++++++
 .../tez/dag/history/utils/ATSConstants.java     |  73 +++++
 .../apache/tez/dag/history/utils/DAGUtils.java  | 152 +++++++++++
 .../apache/tez/dag/recovery/RecoveryParser.java | 186 +++++++++++++
 .../org/apache/tez/dag/utils/ProtoUtils.java    |  34 +++
 tez-dag/src/main/proto/HistoryEvents.proto      | 144 ++++++++++
 .../tez/dag/app/dag/impl/TestDAGImpl.java       |  16 +-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  13 +-
 .../mapreduce/examples/OrderedWordCount.java    |   7 +-
 .../org/apache/tez/common/ContainerTask.java    |   1 +
 .../org/apache/tez/common/ProtoConverters.java  |  89 +++++++
 .../apache/tez/runtime/api/impl/TezEvent.java   |  44 +--
 53 files changed, 3344 insertions(+), 567 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7ebc4ee..30d818a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -299,6 +299,11 @@
         <artifactId>guava</artifactId>
         <version>15.0</version>
       </dependency>
+      <dependency>
+        <groupId>org.codehaus.jettison</groupId>
+        <artifactId>jettison</artifactId>
+        <version>1.3.4</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 1616adc..6a67e70 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -329,4 +329,25 @@ public class TezConfiguration extends Configuration {
    */
   @Private
   public static final String TEZ_SHUFFLE_HANDLER_SERVICE_ID = "mapreduce_shuffle";
+
+
+  @Private
+  public static final String TEZ_PREWARM_DAG_NAME_PREFIX = "TezPreWarmDAG";
+
+  public static final String YARN_ATS_ENABLED =
+      TEZ_PREFIX + "yarn.ats.enabled";
+  public static final boolean YARN_ATS_ENABLED_DEFAULT = false;
+
+  public static final String DAG_RECOVERY_ENABLED =
+      TEZ_PREFIX + "dag.recovery.enabled";
+  public static final boolean DAG_RECOVERY_ENABLED_DEFAULT = false;
+
+  public static final String DAG_RECOVERY_FILE_IO_BUFFER_SIZE =
+      TEZ_PREFIX + "dag.recovery.io.buffer.size";
+  public static final int DAG_RECOVERY_FILE_IO_BUFFER_SIZE_DEFAULT = 8192;
+
+  public static final String DAG_RECOVERY_DATA_DIR_NAME = "recovery";
+  public static final String DAG_RECOVERY_SUMMARY_FILE_SUFFIX = ".summary";
+  public static final String DAG_RECOVERY_RECOVER_FILE_SUFFIX = ".recovery";
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
index 95bd4fe..d12f760 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
@@ -144,7 +144,7 @@ public class TezDAGID extends TezID {
 
   // DO NOT CHANGE THIS. DAGClient replicates this code to create DAG id string
   public static final String DAG = "dag";
-  protected static final ThreadLocal<NumberFormat> tezAppIdFormat = new ThreadLocal<NumberFormat>() {
+  static final ThreadLocal<NumberFormat> tezAppIdFormat = new ThreadLocal<NumberFormat>() {
     @Override
     public NumberFormat initialValue() {
       NumberFormat fmt = NumberFormat.getInstance();
@@ -154,7 +154,7 @@ public class TezDAGID extends TezID {
     }
   };
   
-  protected static final ThreadLocal<NumberFormat> tezDagIdFormat = new ThreadLocal<NumberFormat>() {
+  static final ThreadLocal<NumberFormat> tezDagIdFormat = new ThreadLocal<NumberFormat>() {
     @Override
     public NumberFormat initialValue() {
       NumberFormat fmt = NumberFormat.getInstance();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java
index 07f8155..0896b99 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java
@@ -21,10 +21,10 @@ package org.apache.tez.dag.records;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.text.NumberFormat;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.tez.dag.records.TezTaskID;
 
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
@@ -144,4 +144,36 @@ public class TezTaskAttemptID extends TezID {
     taskId.write(out);
     super.write(out);
   }
+
+  protected static final ThreadLocal<NumberFormat> tezTaskAttemptIdFormat = new ThreadLocal<NumberFormat>() {
+    @Override
+    public NumberFormat initialValue() {
+      NumberFormat fmt = NumberFormat.getInstance();
+      fmt.setGroupingUsed(false);
+      fmt.setMinimumIntegerDigits(1);
+      return fmt;
+    }
+  };
+
+  public static TezTaskAttemptID fromString(String taIdStr) {
+    try {
+      String[] split = taIdStr.split("_");
+      String rmId = split[1];
+      int appId = TezDAGID.tezAppIdFormat.get().parse(split[2]).intValue();
+      int dagId = TezDAGID.tezDagIdFormat.get().parse(split[3]).intValue();
+      int vId = TezVertexID.tezVertexIdFormat.get().parse(split[4]).intValue();
+      int taskId = TezTaskID.tezTaskIdFormat.get().parse(split[5]).intValue();
+      int id = tezTaskAttemptIdFormat.get().parse(split[6]).intValue();
+
+      return TezTaskAttemptID.getInstance(
+          TezTaskID.getInstance(
+              TezVertexID.getInstance(
+                  TezDAGID.getInstance(rmId, appId, dagId),
+                  vId), taskId), id);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    return null;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
index 282421a..0a7bc4a 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
@@ -25,7 +25,6 @@ import java.text.NumberFormat;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.tez.dag.records.TezVertexID;
 
 import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
@@ -46,7 +45,7 @@ import com.google.common.cache.LoadingCache;
 public class TezTaskID extends TezID {
   public static final String TASK = "task";
   
-  protected static final ThreadLocal<NumberFormat> idFormat = new ThreadLocal<NumberFormat>() {
+  static final ThreadLocal<NumberFormat> tezTaskIdFormat = new ThreadLocal<NumberFormat>() {
     @Override
     public NumberFormat initialValue() {
       NumberFormat fmt = NumberFormat.getInstance();
@@ -125,7 +124,7 @@ public class TezTaskID extends TezID {
   protected StringBuilder appendTo(StringBuilder builder) {
     return vertexId.appendTo(builder).
                  append(SEPARATOR).
-                 append(idFormat.get().format(id));
+                 append(tezTaskIdFormat.get().format(id));
   }
 
   @Override
@@ -153,4 +152,23 @@ public class TezTaskID extends TezID {
     super.write(out);
   }
 
+  public static TezTaskID fromString(String taskIdStr) {
+    try {
+      String[] split = taskIdStr.split("_");
+      String rmId = split[1];
+      int appId = TezDAGID.tezAppIdFormat.get().parse(split[2]).intValue();
+      int dagId = TezDAGID.tezDagIdFormat.get().parse(split[3]).intValue();
+      int vId = TezVertexID.tezVertexIdFormat.get().parse(split[4]).intValue();
+      int id = tezTaskIdFormat.get().parse(split[5]).intValue();
+
+      return TezTaskID.getInstance(
+              TezVertexID.getInstance(
+                  TezDAGID.getInstance(rmId, appId, dagId),
+                  vId), id);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    return null;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
index b3930cf..a9b2625 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
@@ -46,7 +46,7 @@ import com.google.common.cache.LoadingCache;
 @InterfaceStability.Stable
 public class TezVertexID extends TezID {
   public static final String VERTEX = "vertex";
-  protected static final ThreadLocal<NumberFormat> idFormat = new ThreadLocal<NumberFormat>() {
+  static final ThreadLocal<NumberFormat> tezVertexIdFormat = new ThreadLocal<NumberFormat>() {
 
     @Override
     public NumberFormat initialValue() {
@@ -142,7 +142,7 @@ public class TezVertexID extends TezID {
   protected StringBuilder appendTo(StringBuilder builder) {
     return dagId.appendTo(builder).
         append(SEPARATOR).
-        append(idFormat.get().format(id));
+        append(tezVertexIdFormat.get().format(id));
   }
 
   @Override
@@ -150,4 +150,21 @@ public class TezVertexID extends TezID {
     return dagId.hashCode() * 530017 + id;
   }
 
+  public static TezVertexID fromString(String taskIdStr) {
+    try {
+      String[] split = taskIdStr.split("_");
+      String rmId = split[1];
+      int appId = TezDAGID.tezAppIdFormat.get().parse(split[2]).intValue();
+      int dagId = TezDAGID.tezDagIdFormat.get().parse(split[3]).intValue();
+      int id = tezVertexIdFormat.get().parse(split[4]).intValue();
+
+      return TezVertexID.getInstance(
+              TezDAGID.getInstance(rmId, appId, dagId),
+              id);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    return null;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/pom.xml
----------------------------------------------------------------------
diff --git a/tez-dag/pom.xml b/tez-dag/pom.xml
index 2342150..4c98ed8 100644
--- a/tez-dag/pom.xml
+++ b/tez-dag/pom.xml
@@ -100,6 +100,10 @@
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.codehaus.jettison</groupId>
+      <artifactId>jettison</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
@@ -109,14 +113,30 @@
         <artifactId>apache-rat-plugin</artifactId>
       </plugin>
       <plugin>
-        <groupId>org.apache.avro</groupId>
-        <artifactId>avro-maven-plugin</artifactId>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-maven-plugins</artifactId>
         <executions>
           <execution>
+            <id>compile-protoc</id>
             <phase>generate-sources</phase>
             <goals>
-              <goal>protocol</goal>
+              <goal>protoc</goal>
             </goals>
+            <configuration>
+              <protocVersion>${protobuf.version}</protocVersion>
+              <protocCommand>${protoc.path}</protocCommand>
+              <imports>
+                <param>${basedir}/src/main/proto</param>
+                <param>${basedir}/../tez-api/src/main/proto</param>
+              </imports>
+              <source>
+                <directory>${basedir}/src/main/proto</directory>
+                <includes>
+                  <include>HistoryEvents.proto</include>
+                </includes>
+              </source>
+              <output>${project.build.directory}/generated-sources/java</output>
+            </configuration>
           </execution>
         </executions>
       </plugin>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/avro/HistoryEvents.avpr
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/avro/HistoryEvents.avpr b/tez-dag/src/main/avro/HistoryEvents.avpr
deleted file mode 100644
index ae03d8e..0000000
--- a/tez-dag/src/main/avro/HistoryEvents.avpr
+++ /dev/null
@@ -1,191 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-{"namespace": "org.apache.tez.dag.history.avro",
- "protocol": "Events",
-
- "types": [
-
-     {"type": "record", "name": "TezCounter",
-      "fields": [
-          {"name": "name", "type": "string"},
-          {"name": "displayName", "type": "string"},
-          {"name": "value", "type": "long"}
-      ]
-     },
-
-     {"type": "record", "name": "TezCounterGroup",
-      "fields": [
-          {"name": "name", "type": "string"},
-          {"name": "displayName", "type": "string"},
-          {"name": "counts", "type": {"type": "array", "items": "TezCounter"}}
-      ]
-     },
-
-     {"type": "record", "name": "TezCounters",
-      "fields": [
-          {"name": "groups", "type": {"type": "array", "items": "TezCounterGroup"}}
-      ]
-     },
-
-
-     {"type": "record", "name": "AMStarted",
-      "fields": [
-          {"name": "applicationAttemptId", "type": "string"},
-          {"name": "appSubmitTime", "type": "long"},
-          {"name": "initTime", "type": "long"},
-          {"name": "startTime", "type": "long"}
-      ]
-     },
-
-     {"type": "record", "name": "ContainerLaunched",
-      "fields": [
-          {"name": "containerId", "type": "string"},
-          {"name": "launchTime", "type": "long"}
-      ]
-     },
-
-     {"type": "record", "name": "DAGStarted",
-      "fields": [
-          {"name": "dagId", "type": "string"},
-          {"name": "initTime", "type": "long"},
-          {"name": "startTime", "type": "long"}
-      ]
-     },
-
-     {"type": "record", "name": "DAGFinished",
-      "fields": [
-          {"name": "dagId", "type": "string"},
-          {"name": "startTime", "type": "long"},
-          {"name": "finishTime", "type": "long"},
-          {"name": "status", "type": "string"},
-          {"name": "diagnostics", "type": "string"},
-          {"name": "counters", "type": "TezCounters"}
-      ]
-     },
-
-     {"type": "record", "name": "VertexStarted",
-      "fields": [
-          {"name": "vertexName", "type": "string"},
-          {"name": "vertexId", "type": "string"},
-          {"name": "initRequestedTime", "type": "long"},
-          {"name": "initedTime", "type": "long"},
-          {"name": "startRequestedTime", "type": "long"},
-          {"name": "startedTime", "type": "long"},
-          {"name": "numTasks", "type": "long"},
-          {"name": "processorName", "type": "string"}
-      ]
-     },
-
-     {"type": "record", "name": "VertexFinished",
-      "fields": [
-          {"name": "vertexName", "type": "string"},
-          {"name": "vertexId", "type": "string"},
-          {"name": "initRequestedTime", "type": "long"},
-          {"name": "initedTime", "type": "long"},
-          {"name": "startRequestedTime", "type": "long"},
-          {"name": "startedTime", "type": "long"},
-          {"name": "finishTime", "type": "long"},
-          {"name": "status", "type": "string"},
-          {"name": "diagnostics", "type": "string"},
-          {"name": "counters", "type": "TezCounters"}
-      ]
-     },
-
-     {"type": "record", "name": "TaskStarted",
-      "fields": [
-          {"name": "vertexName", "type": "string"},
-          {"name": "taskId", "type": "string"},
-          {"name": "scheduledTime", "type": "long"},
-          {"name": "launchTime", "type": "long"}
-      ]
-     },
-
-     {"type": "record", "name": "TaskFinished",
-      "fields": [
-          {"name": "vertexName", "type": "string"},
-          {"name": "taskId", "type": "string"},
-          {"name": "startTime", "type": "long"},
-          {"name": "finishTime", "type": "long"},
-          {"name": "status", "type": "string"},
-          {"name": "diagnostics", "type": "string"},
-          {"name": "counters", "type": "TezCounters"}
-      ]
-     },
-
-     {"type": "record", "name": "TaskAttemptStarted",
-      "fields": [
-          {"name": "vertexName", "type": "string"},
-          {"name": "taskAttemptId", "type": "string"},
-          {"name": "startTime", "type": "long"},
-          {"name": "containerId", "type": "string"},
-          {"name": "nodeId", "type": "string"}
-      ]
-     },
-
-     {"type": "record", "name": "TaskAttemptFinished",
-      "fields": [
-          {"name": "vertexName", "type": "string"},
-          {"name": "taskAttemptId", "type": "string"},
-          {"name": "startTime", "type": "long"},
-          {"name": "finishTime", "type": "long"},
-          {"name": "status", "type": "string"},
-          {"name": "diagnostics", "type": "string"},
-          {"name": "counters", "type": "TezCounters"}
-      ]
-     },
-
-     {"type": "enum", "name": "HistoryEventType",
-      "symbols": [
-          "AM_STARTED",
-          "DAG_STARTED",
-          "DAG_FINISHED",
-          "VERTEX_STARTED",
-          "VERTEX_FINISHED",
-          "TASK_STARTED",
-          "TASK_FINISHED",
-          "TASK_ATTEMPT_STARTED",
-          "TASK_ATTEMPT_FINISHED",
-          "CONTAINER_LAUNCHED"
-          ]
-     },
-
-     {"type": "record", "name": "HistoryEvent",
-      "fields": [
-          {"name": "type", "type": "HistoryEventType"},
-          {"name": "event",
-           "type": [
-               "AMStarted",
-               "ContainerLaunched",
-               "DAGStarted",
-               "DAGFinished",
-               "VertexStarted",
-               "VertexFinished",
-               "TaskStarted",
-               "TaskFinished",
-               "TaskAttemptStarted",
-               "TaskAttemptFinished"
-               ]
-          }
-      ]
-     }
-
- ],
-
- "messages": {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index 638d3ef..89fa1a6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -22,6 +22,7 @@ import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -32,6 +33,7 @@ import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
 import org.apache.tez.dag.app.rm.container.AMContainerMap;
 import org.apache.tez.dag.app.rm.node.AMNodeMap;
+import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.records.TezDAGID;
 
 
@@ -81,4 +83,11 @@ public interface AppContext {
   boolean isSession();
 
   DAGAppMasterState getAMState();
+
+  HistoryEventHandler getHistoryHandler();
+
+  Path getCurrentRecoveryDir();
+
+  boolean isRecoveryEnabled();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 0210a9a..c19eb7c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -46,6 +46,8 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.Credentials;
@@ -123,12 +125,15 @@ import org.apache.tez.dag.app.rm.node.AMNodeEventType;
 import org.apache.tez.dag.app.rm.node.AMNodeMap;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEventHandler;
-import org.apache.tez.dag.history.avro.HistoryEventType;
+import org.apache.tez.dag.history.events.AMLaunchedEvent;
 import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.utils.DAGUtils;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.utils.Graph;
 import org.apache.tez.runtime.library.common.security.JobTokenSecretManager;
 import com.google.common.annotations.VisibleForTesting;
+import org.codehaus.jettison.json.JSONException;
 
 /**
  * The Map-Reduce Application Master.
@@ -205,6 +210,11 @@ public class DAGAppMaster extends AbstractService {
   private long sessionTimeoutInterval;
   private long lastDAGCompletionTime;
   private Timer dagSubmissionTimer;
+  private boolean recoveryEnabled;
+  private Path recoveryDataDir;
+  private Path currentRecoveryDataDir;
+  private FileSystem recoveryFS;
+  private int recoveryBufferSize;
 
   // DAG Counter
   private final AtomicInteger dagCounter = new AtomicInteger();
@@ -318,12 +328,23 @@ public class DAGAppMaster extends AbstractService {
 
     historyEventHandler = new HistoryEventHandler(context);
     addIfService(historyEventHandler, true);
-    dispatcher.register(HistoryEventType.class, historyEventHandler);
 
     this.sessionTimeoutInterval = 1000 * amConf.getInt(
             TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS,
             TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS_DEFAULT);
 
+    recoveryDataDir = FileSystem.get(conf).makeQualified(new Path(
+        conf.get(TezConfiguration.TEZ_AM_STAGING_DIR,
+            TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT),
+              this.appAttemptID.getApplicationId().toString() +
+                  File.separator + TezConfiguration.DAG_RECOVERY_DATA_DIR_NAME));
+    currentRecoveryDataDir = new Path(recoveryDataDir,
+        Integer.toString(this.appAttemptID.getAttemptId()));
+    recoveryFS = FileSystem.get(recoveryDataDir.toUri(), conf);
+    recoveryBufferSize = conf.getInt(
+        TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE,
+        TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE_DEFAULT);
+
     if (isSession) {
       FileInputStream sessionResourcesStream = null;
       try {
@@ -340,9 +361,17 @@ public class DAGAppMaster extends AbstractService {
       }
     }
 
+    recoveryEnabled = conf.getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED,
+        TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT);
+
     initServices(conf);
     super.serviceInit(conf);
 
+    AMLaunchedEvent launchedEvent = new AMLaunchedEvent(appAttemptID,
+        startTime, appSubmitTime);
+    historyEventHandler.handle(
+        new DAGHistoryEvent(launchedEvent));
+
     this.state = DAGAppMasterState.INITED;
 
   }
@@ -392,18 +421,21 @@ public class DAGAppMaster extends AbstractService {
         lastDAGCompletionTime = clock.getTime();
         switch(finishEvt.getDAGState()) {
         case SUCCEEDED:
-          if (!currentDAG.getName().equals("PreWarmDAG")) {
+          if (!currentDAG.getName().startsWith(
+              TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
             successfulDAGs.incrementAndGet();
           }
           break;
         case ERROR:
         case FAILED:
-          if (!currentDAG.getName().equals("PreWarmDAG")) {
+          if (!currentDAG.getName().startsWith(
+              TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
             failedDAGs.incrementAndGet();
           }
           break;
         case KILLED:
-          if (!currentDAG.getName().equals("PreWarmDAG")) {
+          if (!currentDAG.getName().startsWith(
+              TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
             killedDAGs.incrementAndGet();
           }
           break;
@@ -487,10 +519,16 @@ public class DAGAppMaster extends AbstractService {
     }
   }
 
-  /** Create and initialize (but don't start) a single dag. */
   protected DAG createDAG(DAGPlan dagPB) {
-    TezDAGID dagId = TezDAGID.getInstance(appAttemptID.getApplicationId(),
-        dagCounter.incrementAndGet());
+    return createDAG(dagPB, null);
+  }
+
+  /** Create and initialize (but don't start) a single dag. */
+  protected DAG createDAG(DAGPlan dagPB, TezDAGID dagId) {
+    if (dagId == null) {
+      dagId = TezDAGID.getInstance(appAttemptID.getApplicationId(),
+          dagCounter.incrementAndGet());
+    }
 
     Iterator<PlanKeyValuePair> iter =
         dagPB.getDagKeyValues().getConfKeyValuesList().iterator();
@@ -517,6 +555,15 @@ public class DAGAppMaster extends AbstractService {
             appMasterUgi.getShortUserName(),
             taskHeartbeatHandler, context);
 
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.info("JSON dump for submitted DAG, dagId=" + dagId.toString()
+            + ", json=" + DAGUtils.generateSimpleJSONPlan(dagPB).toString());
+      }
+    } catch (JSONException e) {
+      LOG.warn("Failed to generate json for DAG", e);
+    }
+
     if (dagConf.getBoolean(TezConfiguration.TEZ_GENERATE_DAG_VIZ,
         TezConfiguration.TEZ_GENERATE_DAG_VIZ_DEFAULT)) {
       generateDAGVizFile(dagId, dagPB);
@@ -820,7 +867,9 @@ public class DAGAppMaster extends AbstractService {
     // Launch new pre-warm DAG
 
     org.apache.tez.dag.api.DAG dag =
-      new org.apache.tez.dag.api.DAG("PreWarmDAG");
+      new org.apache.tez.dag.api.DAG(
+          TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX +
+              Integer.toString(dagCounter.get() + 1));
     if (preWarmContext.getLocationHints().getNumTasks() <= 0) {
       LOG.warn("Ignoring pre-warm context as invalid numContainers specified: "
           + preWarmContext.getLocationHints().getNumTasks());
@@ -1044,6 +1093,21 @@ public class DAGAppMaster extends AbstractService {
     }
 
     @Override
+    public HistoryEventHandler getHistoryHandler() {
+      return historyEventHandler;
+    }
+
+    @Override
+    public Path getCurrentRecoveryDir() {
+      return currentRecoveryDataDir;
+    }
+
+    @Override
+    public boolean isRecoveryEnabled() {
+      return recoveryEnabled;
+    }
+
+    @Override
     public Map<ApplicationAccessType, String> getApplicationACLs() {
       if (getServiceState() != STATE.STARTED) {
         throw new TezUncheckedException(
@@ -1258,8 +1322,8 @@ public class DAGAppMaster extends AbstractService {
 
     this.appsStartTime = clock.getTime();
     AMStartedEvent startEvent = new AMStartedEvent(appAttemptID,
-        startTime, appsStartTime, appSubmitTime);
-    dispatcher.getEventHandler().handle(
+        appsStartTime);
+    historyEventHandler.handle(
         new DAGHistoryEvent(startEvent));
 
     this.lastDAGCompletionTime = clock.getTime();
@@ -1270,6 +1334,14 @@ public class DAGAppMaster extends AbstractService {
       LOG.info("In Session mode. Waiting for DAG over RPC");
       this.state = DAGAppMasterState.IDLE;
 
+      if (recoveryEnabled) {
+        if (this.appAttemptID.getAttemptId() > 0) {
+          // Recovery data and copy over into new recovery dir
+          this.state = DAGAppMasterState.RECOVERING;
+          // TODO
+        }
+      }
+
       this.dagSubmissionTimer = new Timer(true);
       this.dagSubmissionTimer.scheduleAtFixedRate(new TimerTask() {
         @Override
@@ -1277,9 +1349,11 @@ public class DAGAppMaster extends AbstractService {
           checkAndHandleSessionTimeout();
         }
       }, sessionTimeoutInterval, sessionTimeoutInterval / 10);
+
     }
   }
 
+
   @Override
   public synchronized void serviceStop() throws Exception {
     if (isSession) {
@@ -1370,7 +1444,8 @@ public class DAGAppMaster extends AbstractService {
   }
 
   private synchronized void checkAndHandleSessionTimeout() {
-    if (this.state.equals(DAGAppMasterState.RUNNING)
+    if (EnumSet.of(DAGAppMasterState.RUNNING,
+        DAGAppMasterState.RECOVERING).contains(this.state)
         || sessionStopped.get()) {
       // DAG running or session already completed, cannot timeout session
       return;
@@ -1497,6 +1572,7 @@ public class DAGAppMaster extends AbstractService {
   }
 
   private void startDAG(DAGPlan dagPlan) {
+    long submitTime = this.clock.getTime();
     this.state = DAGAppMasterState.RUNNING;
     if (LOG.isDebugEnabled()) {
       LOG.debug("Running a DAG with " + dagPlan.getVertexCount()
@@ -1513,6 +1589,9 @@ public class DAGAppMaster extends AbstractService {
 
     // /////////////////// Create the job itself.
     DAG newDAG = createDAG(dagPlan);
+    DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(newDAG.getID(),
+        submitTime, dagPlan, this.appAttemptID);
+    historyEventHandler.handle(new DAGHistoryEvent(newDAG.getID(), submittedEvent));
     startDAG(newDAG);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMasterState.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMasterState.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMasterState.java
index a410c0a..238ff4c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMasterState.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMasterState.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.app;
 public enum DAGAppMasterState {
   NEW,
   INITED,
+  RECOVERING,
   IDLE,
   RUNNING,
   SUCCEEDED,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 03fc0b2..f986eb6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -34,6 +34,7 @@ import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.api.client.ProgressBuilder;
 import org.apache.tez.dag.api.client.VertexStatusBuilder;
+import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.dag.impl.Edge;
 import org.apache.tez.dag.app.dag.impl.RootInputLeafOutputDescriptor;
 import org.apache.tez.dag.records.TezTaskID;
@@ -109,4 +110,7 @@ public interface Vertex extends Comparable<Vertex> {
   public DAG getDAG();
   VertexTerminationCause getTerminationCause();
 
+  // TODO remove this once RootInputVertexManager is fixed to not use
+  // internal apis
+  AppContext getAppContext();
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 85f102c..04ed223 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
-import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
@@ -95,6 +94,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventTermination;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
 import org.apache.tez.dag.history.events.DAGStartedEvent;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezVertexID;
@@ -821,17 +821,22 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     this.setFinishTime();
     DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
         finishTime, DAGStatus.State.SUCCEEDED, "", getAllCounters());
-    this.eventHandler.handle(
-        new DAGHistoryEvent(finishEvt));
+    this.appContext.getHistoryHandler().handle(
+        new DAGHistoryEvent(dagId, finishEvt));
   }
 
   void logJobHistoryInitedEvent() {
-    // FIXME should we have more information in this event?
-    // numVertices, etc?
+    DAGInitializedEvent initEvt = new DAGInitializedEvent(this.dagId,
+        this.initTime);
+    this.appContext.getHistoryHandler().handle(
+        new DAGHistoryEvent(dagId, initEvt));
+  }
+
+  void logJobHistoryStartedEvent() {
     DAGStartedEvent startEvt = new DAGStartedEvent(this.dagId,
-        this.initTime, this.startTime);
-    this.eventHandler.handle(
-        new DAGHistoryEvent(startEvt));
+        this.startTime);
+    this.appContext.getHistoryHandler().handle(
+        new DAGHistoryEvent(dagId, startEvt));
   }
 
   void logJobHistoryUnsuccesfulEvent(DAGStatus.State state) {
@@ -839,8 +844,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         clock.getTime(), state,
         StringUtils.join(LINE_SEPARATOR, getDiagnostics()),
         getAllCounters());
-    this.eventHandler.handle(
-        new DAGHistoryEvent(finishEvt));
+    this.appContext.getHistoryHandler().handle(
+        new DAGHistoryEvent(dagId, finishEvt));
   }
 
   static DAGState checkDAGForCompletion(DAGImpl dag) {
@@ -1135,6 +1140,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
       // TODO Metrics
       //dag.metrics.endPreparingJob(dag);
+      dag.logJobHistoryInitedEvent();
       return DAGState.INITED;
 
 
@@ -1227,7 +1233,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     @Override
     public void transition(DAGImpl dag, DAGEvent event) {
       dag.startTime = dag.clock.getTime();
-      dag.logJobHistoryInitedEvent();
+      dag.logJobHistoryStartedEvent();
       // TODO Metrics
       //job.metrics.runningJob(job);
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
index fec97d2..7c0991c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.app.dag.impl;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -30,6 +31,8 @@ import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
@@ -119,6 +122,19 @@ public class RootInputVertexManager implements VertexManagerPlugin {
         Preconditions.checkState(context.getVertexNumTasks(context.getVertexName()) != 0);
         TezEvent tezEvent = new TezEvent(event, sourceInfo);
         tezEvent.setDestinationInfo(destInfoMap.get(inputName));
+        // FIXME the event should be sent via the context and not directly to a
+        // task
+        // FIXME event handler should not exposed to plugins
+
+        if (deleteVertex.getAppContext().isRecoveryEnabled()) {
+          VertexDataMovementEventsGeneratedEvent historyEvent =
+              new VertexDataMovementEventsGeneratedEvent(deleteVertex.getVertexId(),
+                  Arrays.asList(tezEvent));
+          // FIXME should not have access to app context
+          deleteVertex.getAppContext().getHistoryHandler().handle(
+              new DAGHistoryEvent(deleteVertex.getDAG().getID(), historyEvent));
+        }
+
         sendEventToTask(TezTaskID.getInstance(deleteVertex.getVertexId(),
             ((RootInputDataInformationEvent) event).getIndex()), tezEvent);
       }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 53ba8ad..02cbe38 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -768,7 +768,8 @@ public class TaskAttemptImpl implements TaskAttempt,
         attemptId, getTask().getVertex().getName(),
         launchTime, containerId, containerNodeId,
         inProgressLogsUrl, completedLogsUrl);
-    eventHandler.handle(new DAGHistoryEvent(startEvt));
+    this.appContext.getHistoryHandler().handle(
+        new DAGHistoryEvent(getDAGID(), startEvt));
   }
 
   @SuppressWarnings("unchecked")
@@ -781,7 +782,8 @@ public class TaskAttemptImpl implements TaskAttempt,
         getFinishTime(), TaskAttemptState.SUCCEEDED, "",
         getCounters());
     // FIXME how do we store information regd completion events
-    eventHandler.handle(new DAGHistoryEvent(finishEvt));
+    this.appContext.getHistoryHandler().handle(
+        new DAGHistoryEvent(getDAGID(), finishEvt));
   }
 
   @SuppressWarnings("unchecked")
@@ -794,7 +796,8 @@ public class TaskAttemptImpl implements TaskAttempt,
             LINE_SEPARATOR, getDiagnostics()),
         getCounters());
     // FIXME how do we store information regd completion events
-    eventHandler.handle(new DAGHistoryEvent(finishEvt));
+    this.appContext.getHistoryHandler().handle(
+        new DAGHistoryEvent(getDAGID(), finishEvt));
   }
 
   //////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 9451d6f..463d23b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -845,7 +845,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   protected void logJobHistoryTaskStartedEvent() {
     TaskStartedEvent startEvt = new TaskStartedEvent(taskId,
         getVertex().getName(), scheduledTime, getLaunchTime());
-    this.eventHandler.handle(new DAGHistoryEvent(startEvt));
+    this.appContext.getHistoryHandler().handle(
+        new DAGHistoryEvent(taskId.getVertexID().getDAGId(), startEvt));
   }
 
   protected void logJobHistoryTaskFinishedEvent() {
@@ -854,14 +855,16 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     TaskFinishedEvent finishEvt = new TaskFinishedEvent(taskId,
         getVertex().getName(), getLaunchTime(), clock.getTime(),
         TaskState.SUCCEEDED, getCounters());
-    this.eventHandler.handle(new DAGHistoryEvent(finishEvt));
+    this.appContext.getHistoryHandler().handle(
+        new DAGHistoryEvent(taskId.getVertexID().getDAGId(), finishEvt));
   }
 
   protected void logJobHistoryTaskFailedEvent(TaskState finalState) {
     TaskFinishedEvent finishEvt = new TaskFinishedEvent(taskId,
         getVertex().getName(), getLaunchTime(), clock.getTime(),
         finalState, getCounters());
-    this.eventHandler.handle(new DAGHistoryEvent(finishEvt));
+    this.appContext.getHistoryHandler().handle(
+        new DAGHistoryEvent(taskId.getVertexID().getDAGId(), finishEvt));
   }
 
   private static class InitialScheduleTransition

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 0ed07a9..26e44ef 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -107,7 +107,9 @@ import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventOneToOneSourceSplit;
 import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
 import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
 import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
 import org.apache.tez.dag.records.TezDAGID;
@@ -786,6 +788,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
   }
 
+  @Override
+  public AppContext getAppContext() {
+    return this.appContext;
+  }
+
   // TODO Create InputReadyVertexManager that schedules when there is something
   // to read and use that as default instead of ImmediateStart.TEZ-480
   @Override
@@ -1014,11 +1021,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   }
 
 
-  void logJobHistoryVertexStartedEvent() {
-    VertexStartedEvent startEvt = new VertexStartedEvent(vertexId, vertexName,
-        initTimeRequested, initedTime, startTimeRequested, startedTime, numTasks,
+  void logJobHistoryVertexInitializedEvent() {
+    VertexInitializedEvent initEvt = new VertexInitializedEvent(vertexId, vertexName,
+        initTimeRequested, initedTime, numTasks,
         getProcessorName());
-    this.eventHandler.handle(new DAGHistoryEvent(startEvt));
+    this.appContext.getHistoryHandler().handle(
+        new DAGHistoryEvent(getDAGId(), initEvt));
+  }
+
+  void logJobHistoryVertexStartedEvent() {
+    VertexStartedEvent startEvt = new VertexStartedEvent(vertexId,
+        startTimeRequested, startedTime);
+    this.appContext.getHistoryHandler().handle(
+        new DAGHistoryEvent(getDAGId(), startEvt));
   }
 
   void logJobHistoryVertexFinishedEvent() {
@@ -1027,7 +1042,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         vertexName, initTimeRequested, initedTime, startTimeRequested,
         startedTime, finishTime, VertexStatus.State.SUCCEEDED, "",
         getAllCounters());
-    this.eventHandler.handle(new DAGHistoryEvent(finishEvt));
+    this.appContext.getHistoryHandler().handle(
+        new DAGHistoryEvent(getDAGId(), finishEvt));
   }
 
   void logJobHistoryVertexFailedEvent(VertexStatus.State state) {
@@ -1035,7 +1051,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         vertexName, initTimeRequested, initedTime, startTimeRequested,
         startedTime, clock.getTime(), state, StringUtils.join(LINE_SEPARATOR,
             getDiagnostics()), getAllCounters());
-    this.eventHandler.handle(new DAGHistoryEvent(finishEvt));
+    this.appContext.getHistoryHandler().handle(
+        new DAGHistoryEvent(getDAGId(), finishEvt));
   }
 
   static VertexState checkVertexForCompletion(final VertexImpl vertex) {
@@ -1250,6 +1267,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     // TODO: Metrics
     initedTime = clock.getTime();
 
+    logJobHistoryVertexInitializedEvent();
     return VertexState.INITED;
   }
 
@@ -2008,6 +2026,29 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     public void transition(VertexImpl vertex, VertexEvent event) {
       VertexEventRouteEvent rEvent = (VertexEventRouteEvent) event;
       List<TezEvent> tezEvents = rEvent.getEvents();
+
+      if (vertex.getAppContext().isRecoveryEnabled()
+          && !tezEvents.isEmpty()) {
+        List<TezEvent> dataMovementEvents =
+            Lists.newArrayList();
+        for (TezEvent tezEvent : tezEvents) {
+          if (!isEventFromVertex(vertex, tezEvent.getSourceInfo())) {
+            continue;
+          }
+          if  (tezEvent.getEventType().equals(EventType.COMPOSITE_DATA_MOVEMENT_EVENT)
+            || tezEvent.getEventType().equals(EventType.DATA_MOVEMENT_EVENT)
+            || tezEvent.getEventType().equals(EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) {
+            dataMovementEvents.add(tezEvent);
+          }
+        }
+        if (!dataMovementEvents.isEmpty()) {
+          VertexDataMovementEventsGeneratedEvent historyEvent =
+              new VertexDataMovementEventsGeneratedEvent(vertex.vertexId,
+                  dataMovementEvents);
+          vertex.appContext.getHistoryHandler().handle(
+              new DAGHistoryEvent(vertex.getDAGId(), historyEvent));
+        }
+      }
       for(TezEvent tezEvent : tezEvents) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Vertex: " + vertex.getName() + " routing event: "

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
index 75f67ad..437d057 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
@@ -172,8 +172,9 @@ public class ContainerLauncherImpl extends AbstractService implements
         context.getEventHandler().handle(
             new AMContainerEventLaunched(containerID));
         ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent(
-            containerID, clock.getTime());
-        context.getEventHandler().handle(new DAGHistoryEvent(lEvt));
+            containerID, clock.getTime(), context.getApplicationAttemptId());
+        context.getHistoryHandler().handle(new DAGHistoryEvent(
+            context.getCurrentDAGID(), lEvt));
 
         this.state = ContainerState.RUNNING;
       } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/java/org/apache/tez/dag/history/DAGHistoryEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/DAGHistoryEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/DAGHistoryEvent.java
index ebb6234..54a686d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/DAGHistoryEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/DAGHistoryEvent.java
@@ -19,19 +19,29 @@
 package org.apache.tez.dag.history;
 
 import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tez.dag.history.avro.HistoryEventType;
+import org.apache.tez.dag.records.TezDAGID;
 
 public class DAGHistoryEvent extends AbstractEvent<HistoryEventType>{
 
   private final HistoryEvent historyEvent;
+  private final TezDAGID dagID;
 
-  public DAGHistoryEvent(HistoryEvent historyEvent) {
+  public DAGHistoryEvent(TezDAGID dagID,
+      HistoryEvent historyEvent) {
     super(historyEvent.getEventType());
+    this.dagID = dagID;
     this.historyEvent = historyEvent;
   }
-  
+
+  public DAGHistoryEvent(HistoryEvent historyEvent) {
+    this(null, historyEvent);
+  }
+
   public HistoryEvent getHistoryEvent() {
     return historyEvent;
   }
 
+  public TezDAGID getDagID() {
+    return this.dagID;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
index df46426..78d1208 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
@@ -18,14 +18,24 @@
 
 package org.apache.tez.dag.history;
 
-import org.apache.tez.dag.history.avro.HistoryEventType;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 
 public interface HistoryEvent {
 
   HistoryEventType getEventType();
 
-  Object getBlob();
+  public JSONObject convertToATSJSON() throws JSONException;
+
+  public boolean isRecoveryEvent();
+
+  public boolean isHistoryEvent();
 
-  void setBlob(Object blob);
+  public void toProtoStream(OutputStream outputStream) throws IOException;
 
+  public void fromProtoStream(InputStream inputStream) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
index 3543259..b64cf1a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
@@ -20,17 +20,26 @@ package org.apache.tez.dag.history;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.history.ats.ATSService;
+import org.apache.tez.dag.history.recovery.RecoveryService;
 import org.apache.tez.dag.records.TezDAGID;
 
-public class HistoryEventHandler extends AbstractService
-implements EventHandler<DAGHistoryEvent> {
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class HistoryEventHandler extends CompositeService {
 
   private static Log LOG = LogFactory.getLog(HistoryEventHandler.class);
 
   private final AppContext context;
+  private boolean yarnATSEnabled;
+  private AtomicBoolean stopped = new AtomicBoolean(false);
+  private ATSService atsService;
+  private RecoveryService recoveryService;
+  private boolean recoveryEnabled;
 
   public HistoryEventHandler(AppContext context) {
     super(HistoryEventHandler.class.getName());
@@ -38,26 +47,63 @@ implements EventHandler<DAGHistoryEvent> {
   }
 
   @Override
-  public void serviceStart() {
+  public void serviceInit(Configuration conf) throws Exception {
+    LOG.info("Initializing HistoryEventHandler");
+    this.yarnATSEnabled = context.getAMConf().getBoolean(TezConfiguration.YARN_ATS_ENABLED,
+        TezConfiguration.YARN_ATS_ENABLED_DEFAULT);
+    this.recoveryEnabled = context.getAMConf().getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED,
+        TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT);
+    if (yarnATSEnabled) {
+      atsService = new ATSService();
+      addService(atsService);
+    }
+    if (recoveryEnabled) {
+      recoveryService = new RecoveryService(context);
+      addService(recoveryService);
+    }
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
     LOG.info("Starting HistoryEventHandler");
+    super.serviceStart();
   }
 
   @Override
-  public void serviceStop() {
+  public void serviceStop() throws Exception {
     LOG.info("Stopping HistoryEventHandler");
+    super.serviceStop();
   }
 
-  @Override
   public void handle(DAGHistoryEvent event) {
-    TezDAGID dagId = context.getCurrentDAGID();
+    TezDAGID dagId = event.getDagID();
     String dagIdStr = "N/A";
     if(dagId != null) {
-      dagIdStr = context.getCurrentDAGID().toString();
+      dagIdStr = dagId.toString();
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Handling history event"
+          + ", eventType=" + event.getHistoryEvent().getEventType());
+    }
+    if (recoveryEnabled && event.getHistoryEvent().isRecoveryEvent()) {
+      recoveryService.handle(event);
     }
+    if (yarnATSEnabled && event.getHistoryEvent().isHistoryEvent()) {
+      atsService.handle(event);
+    }
+
+    // TODO at some point we should look at removing this once
+    // there is a UI in place
     LOG.info("[HISTORY]"
         + "[DAG:" + dagIdStr + "]"
         + "[Event:" + event.getType().name() + "]"
-        + ": " + event.getHistoryEvent().getBlob().toString());
+        + ": " + event.getHistoryEvent().toString());
   }
 
+
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
new file mode 100644
index 0000000..a71686b
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history;
+
+public enum HistoryEventType {
+  AM_LAUNCHED,
+  AM_STARTED,
+  DAG_SUBMITTED,
+  DAG_INITIALIZED,
+  DAG_STARTED,
+  DAG_FINISHED,
+  VERTEX_INITIALIZED,
+  VERTEX_STARTED,
+  VERTEX_FINISHED,
+  TASK_STARTED,
+  TASK_FINISHED,
+  TASK_ATTEMPT_STARTED,
+  TASK_ATTEMPT_FINISHED,
+  CONTAINER_LAUNCHED,
+  VERTEX_DATA_MOVEMENT_EVENTS_GENERATED
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/java/org/apache/tez/dag/history/SummaryEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/SummaryEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/SummaryEvent.java
new file mode 100644
index 0000000..587ee3e
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/SummaryEvent.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public interface SummaryEvent {
+
+  public void toSummaryProtoStream(OutputStream outputStream) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java
new file mode 100644
index 0000000..4ec0632
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.ats;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.recovery.RecoveryService;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ATSService extends AbstractService {
+
+  private static final Log LOG = LogFactory.getLog(ATSService.class);
+
+  private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
+      new LinkedBlockingQueue<DAGHistoryEvent>();
+
+  private final AtomicInteger historyCounter =
+      new AtomicInteger(0);
+  private String outputFilePrefix;
+  private Thread eventHandlingThread;
+  private AtomicBoolean stopped = new AtomicBoolean(false);
+  private int eventCounter = 0;
+  private int eventsProcessed = 0;
+  private final Object lock = new Object();
+
+  public ATSService() {
+    super(ATSService.class.getName());
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    LOG.info("Initializing ATSService");
+
+  }
+
+  @Override
+  public void serviceStart() {
+    LOG.info("Starting ATSService");
+    eventHandlingThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        DAGHistoryEvent event;
+        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+
+          // Log the size of the event-queue every so often.
+          if (eventCounter != 0 && eventCounter % 1000 == 0) {
+            LOG.info("Event queue stats"
+                + ", eventsProcessedSinceLastUpdate=" + eventsProcessed
+                + ", eventQueueSize=" + eventQueue.size());
+            eventCounter = 0;
+            eventsProcessed = 0;
+          } else {
+            ++eventCounter;
+          }
+
+          try {
+            event = eventQueue.take();
+          } catch (InterruptedException e) {
+            LOG.info("EventQueue take interrupted. Returning");
+            return;
+          }
+
+          synchronized (lock) {
+            ++eventsProcessed;
+            try {
+              handleEvent(event);
+            } catch (Exception e) {
+              // TODO handle failures - treat as fatal or ignore?
+              LOG.warn("Error handling event", e);
+            }
+          }
+        }
+      }
+    }, "HistoryEventHandlingThread");
+    eventHandlingThread.start();
+  }
+
+  @Override
+  public void serviceStop() {
+    LOG.info("Stopping ATSService");
+    stopped.set(true);
+    if (eventHandlingThread != null) {
+      eventHandlingThread.interrupt();
+    }
+  }
+
+  public void handle(DAGHistoryEvent event) {
+    eventQueue.add(event);
+  }
+
+  private void handleEvent(DAGHistoryEvent event) {
+    HistoryEventType eventType = event.getHistoryEvent().getEventType();
+    try {
+      // TODO integrate with ATS
+    } catch (Exception e) {
+      LOG.warn("Could not handle history event, eventType="
+          + eventType, e);
+      // TODO handle error as a fatal event or ignore/skip?
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/java/org/apache/tez/dag/history/ats/EntityTypes.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/ats/EntityTypes.java b/tez-dag/src/main/java/org/apache/tez/dag/history/ats/EntityTypes.java
new file mode 100644
index 0000000..a7f0208
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/ats/EntityTypes.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.ats;
+
+public enum EntityTypes {
+  TEZ_APPLICATION_ATTEMPT,
+  TEZ_CONTAINER_ID,
+  TEZ_DAG_ID,
+  TEZ_VERTEX_ID,
+  TEZ_TASK_ID,
+  TEZ_TASK_ATTEMPT_ID,
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
new file mode 100644
index 0000000..4794a7b
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.ats.EntityTypes;
+import org.apache.tez.dag.history.utils.ATSConstants;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.AMLaunchedProto;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class AMLaunchedEvent implements HistoryEvent {
+
+  private ApplicationAttemptId applicationAttemptId;
+  private long launchTime;
+  private long appSubmitTime;
+
+  public AMLaunchedEvent() {
+  }
+
+  public AMLaunchedEvent(ApplicationAttemptId appAttemptId,
+      long launchTime, long appSubmitTime) {
+    this.applicationAttemptId = appAttemptId;
+    this.launchTime = launchTime;
+    this.appSubmitTime = appSubmitTime;
+  }
+
+  @Override
+  public HistoryEventType getEventType() {
+    return HistoryEventType.AM_LAUNCHED;
+  }
+
+  @Override
+  public JSONObject convertToATSJSON() throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY,
+        "tez_" + applicationAttemptId.toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+    // Related Entities
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject appEntity = new JSONObject();
+    appEntity.put(ATSConstants.ENTITY,
+        applicationAttemptId.getApplicationId().toString());
+    appEntity.put(ATSConstants.ENTITY_TYPE,
+        ATSConstants.APPLICATION_ID);
+    JSONObject appAttemptEntity = new JSONObject();
+    appAttemptEntity.put(ATSConstants.ENTITY,
+        applicationAttemptId.toString());
+    appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
+        ATSConstants.APPLICATION_ATTEMPT_ID);
+    relatedEntities.put(appEntity);
+    relatedEntities.put(appAttemptEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // TODO decide whether this goes into different events,
+    // event info or other info.
+    JSONArray events = new JSONArray();
+    JSONObject initEvent = new JSONObject();
+    initEvent.put(ATSConstants.TIMESTAMP, launchTime);
+    initEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.AM_LAUNCHED.name());
+    events.put(initEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    // Other info to tag with Tez AM
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.APP_SUBMIT_TIME, appSubmitTime);
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  @Override
+  public boolean isRecoveryEvent() {
+    return true;
+  }
+
+  @Override
+  public boolean isHistoryEvent() {
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "appAttemptId=" + applicationAttemptId
+        + ", appSubmitTime=" + appSubmitTime
+        + ", launchTime=" + launchTime;
+  }
+
+  public AMLaunchedProto toProto() {
+    return AMLaunchedProto.newBuilder()
+        .setApplicationAttemptId(this.applicationAttemptId.toString())
+        .setAppSubmitTime(appSubmitTime)
+        .setLaunchTime(launchTime)
+        .build();
+  }
+
+  public void fromProto(AMLaunchedProto proto) {
+    this.applicationAttemptId =
+        ConverterUtils.toApplicationAttemptId(proto.getApplicationAttemptId());
+    this.launchTime = proto.getLaunchTime();
+    this.appSubmitTime = proto.getAppSubmitTime();
+  }
+
+  @Override
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
+  }
+
+  @Override
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    AMLaunchedProto proto = AMLaunchedProto.parseDelimitedFrom(inputStream);
+    fromProto(proto);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
index eda953e..b3cbb5c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
@@ -19,20 +19,32 @@
 package org.apache.tez.dag.history.events;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.dag.history.HistoryEvent;
-import org.apache.tez.dag.history.avro.AMStarted;
-import org.apache.tez.dag.history.avro.HistoryEventType;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.ats.EntityTypes;
+import org.apache.tez.dag.history.utils.ATSConstants;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.AMStartedProto;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 
 public class AMStartedEvent implements HistoryEvent {
 
-  private AMStarted datum = new AMStarted();
+  private ApplicationAttemptId applicationAttemptId;
+  private long startTime;
+
+  public AMStartedEvent() {
+  }
 
   public AMStartedEvent(ApplicationAttemptId appAttemptId,
-      long initTime, long startTime, long appSubmitTime) {
-    datum.applicationAttemptId = appAttemptId.toString();
-    datum.initTime = initTime;
-    datum.startTime = startTime;
-    datum.appSubmitTime = appSubmitTime;
+      long startTime) {
+    this.applicationAttemptId = appAttemptId;
+    this.startTime = startTime;
   }
 
   @Override
@@ -41,21 +53,80 @@ public class AMStartedEvent implements HistoryEvent {
   }
 
   @Override
-  public Object getBlob() {
-    // TODO Auto-generated method stub
-    return this.toString();
+  public JSONObject convertToATSJSON() throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY,
+        "tez_" + applicationAttemptId.toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+    // Related Entities
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject appEntity = new JSONObject();
+    appEntity.put(ATSConstants.ENTITY,
+        applicationAttemptId.getApplicationId().toString());
+    appEntity.put(ATSConstants.ENTITY_TYPE,
+        ATSConstants.APPLICATION_ID);
+    JSONObject appAttemptEntity = new JSONObject();
+    appAttemptEntity.put(ATSConstants.ENTITY,
+        applicationAttemptId.toString());
+    appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
+        ATSConstants.APPLICATION_ATTEMPT_ID);
+    relatedEntities.put(appEntity);
+    relatedEntities.put(appAttemptEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // TODO decide whether this goes into different events,
+    // event info or other info.
+    JSONArray events = new JSONArray();
+    JSONObject startEvent = new JSONObject();
+    startEvent.put(ATSConstants.TIMESTAMP, startTime);
+    startEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.AM_STARTED.name());
+    events.put(startEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    return jsonObject;
+  }
+
+  @Override
+  public boolean isRecoveryEvent() {
+    return true;
   }
 
   @Override
-  public void setBlob(Object blob) {
-    this.datum = (AMStarted) blob;
+  public boolean isHistoryEvent() {
+    return true;
   }
 
   @Override
   public String toString() {
-    return "appAttemptId=" + datum.applicationAttemptId
-        + ", appSubmitTime=" + datum.appSubmitTime
-        + ", initTime=" + datum.initTime
-        + ", startTime=" + datum.startTime;
+    return "appAttemptId=" + applicationAttemptId
+        + ", startTime=" + startTime;
+  }
+
+  public AMStartedProto toProto() {
+    return AMStartedProto.newBuilder()
+        .setApplicationAttemptId(this.applicationAttemptId.toString())
+        .setStartTime(startTime)
+        .build();
+  }
+
+  public void fromProto(AMStartedProto proto) {
+    this.applicationAttemptId =
+        ConverterUtils.toApplicationAttemptId(proto.getApplicationAttemptId());
+    this.startTime = proto.getStartTime();
+  }
+
+  @Override
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
   }
+
+  @Override
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    AMStartedProto proto = AMStartedProto.parseDelimitedFrom(inputStream);
+    fromProto(proto);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/java/org/apache/tez/dag/history/events/AvroUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AvroUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AvroUtils.java
deleted file mode 100644
index 3587964..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AvroUtils.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.events;
-
-import java.util.ArrayList;
-
-import org.apache.avro.util.Utf8;
-import org.apache.tez.common.counters.CounterGroup;
-import org.apache.tez.dag.history.avro.TezCounter;
-import org.apache.tez.dag.history.avro.TezCounterGroup;
-import org.apache.tez.dag.history.avro.TezCounters;
-
-public class AvroUtils {
-
-  public static TezCounters toAvro(
-      org.apache.tez.common.counters.TezCounters counters) {
-    TezCounters result = new TezCounters();
-    result.groups = new ArrayList<TezCounterGroup>(0);
-    if (counters == null) return result;
-    for (CounterGroup group : counters) {
-      TezCounterGroup g = new TezCounterGroup();
-      g.name = new Utf8(group.getName());
-      g.displayName = new Utf8(group.getDisplayName());
-      g.counts = new ArrayList<TezCounter>(group.size());
-      for (org.apache.tez.common.counters.TezCounter counter : group) {
-        TezCounter c = new TezCounter();
-        c.name = new Utf8(counter.getName());
-        c.displayName = new Utf8(counter.getDisplayName());
-        c.value = counter.getValue();
-        g.counts.add(c);
-      }
-      result.groups.add(g);
-    }
-    return result;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
index fbca34b..066f315 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
@@ -18,19 +18,37 @@
 
 package org.apache.tez.dag.history.events;
 
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.dag.history.HistoryEvent;
-import org.apache.tez.dag.history.avro.ContainerLaunched;
-import org.apache.tez.dag.history.avro.HistoryEventType;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.ats.EntityTypes;
+import org.apache.tez.dag.history.utils.ATSConstants;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.ContainerLaunchedProto;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 
 public class ContainerLaunchedEvent implements HistoryEvent {
 
-  private ContainerLaunched datum = new ContainerLaunched();
+  private ContainerId containerId;
+  private long launchTime;
+  private ApplicationAttemptId applicationAttemptId;
+
+  public ContainerLaunchedEvent() {
+  }
 
   public ContainerLaunchedEvent(ContainerId containerId,
-      long launchTime) {
-    datum.containerId = containerId.toString();
-    datum.launchTime = launchTime;
+      long launchTime,
+      ApplicationAttemptId applicationAttemptId) {
+    this.containerId = containerId;
+    this.launchTime = launchTime;
+    this.applicationAttemptId = applicationAttemptId;
   }
 
   @Override
@@ -39,20 +57,85 @@ public class ContainerLaunchedEvent implements HistoryEvent {
   }
 
   @Override
-  public Object getBlob() {
-    // TODO Auto-generated method stub
-    return this.toString();
+  public JSONObject convertToATSJSON() throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY,
+        "tez_" + containerId.toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_CONTAINER_ID.name());
+
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject appAttemptEntity = new JSONObject();
+    appAttemptEntity.put(ATSConstants.ENTITY,
+        applicationAttemptId.toString());
+    appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+    JSONObject containerEntity = new JSONObject();
+    containerEntity.put(ATSConstants.ENTITY, containerId.toString());
+    containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
+
+    relatedEntities.put(appAttemptEntity);
+    relatedEntities.put(containerEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // TODO decide whether this goes into different events,
+    // event info or other info.
+    JSONArray events = new JSONArray();
+    JSONObject launchEvent = new JSONObject();
+    launchEvent.put(ATSConstants.TIMESTAMP, launchTime);
+    launchEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.CONTAINER_LAUNCHED.name());
+    events.put(launchEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    // TODO add other container info here? or assume AHS will have this?
+    // TODO container logs?
+
+    return jsonObject;
+  }
+
+  @Override
+  public boolean isRecoveryEvent() {
+    return false;
+  }
+
+  @Override
+  public boolean isHistoryEvent() {
+    return true;
+  }
+
+  public ContainerLaunchedProto toProto() {
+    return ContainerLaunchedProto.newBuilder()
+        .setApplicationAttemptId(applicationAttemptId.toString())
+        .setContainerId(containerId.toString())
+        .setLaunchTime(launchTime)
+        .build();
+  }
+
+  public void fromProto(ContainerLaunchedProto proto) {
+    this.containerId = ConverterUtils.toContainerId(proto.getContainerId());
+    launchTime = proto.getLaunchTime();
+    this.applicationAttemptId = ConverterUtils.toApplicationAttemptId(
+        proto.getApplicationAttemptId());
+  }
+
+  @Override
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
   }
 
   @Override
-  public void setBlob(Object blob) {
-    this.datum = (ContainerLaunched) blob;
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    ContainerLaunchedProto proto =
+        ContainerLaunchedProto.parseDelimitedFrom(inputStream);
+    fromProto(proto);
   }
 
   @Override
   public String toString() {
-    return "containerId=" + datum.containerId
-        + ", launchTime=" + datum.launchTime;
+    return "containerId=" + containerId
+        + ", launchTime=" + launchTime;
   }
 
 }


Mime
View raw message