tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject [3/6] tez git commit: TEZ-2168. Fix application dependencies on mutually exclusive artifacts: tez-yarn-timeline-history and tez-yarn-timeline-history-with-acls. (hitesh)
Date Wed, 11 Mar 2015 04:38:57 GMT
http://git-wip-us.apache.org/repos/asf/tez/blob/6692c514/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats
deleted file mode 120000
index 5ef7ca9..0000000
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats
+++ /dev/null
@@ -1 +0,0 @@
-../../../../../../../../../../tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats
\ No newline at end of file
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
new file mode 100644
index 0000000..c68d395
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.ats;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.common.security.HistoryACLPolicyManager;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.logging.HistoryLoggingService;
+import org.apache.tez.dag.records.TezDAGID;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class ATSHistoryLoggingService extends HistoryLoggingService {
+
+  private static final Log LOG = LogFactory.getLog(ATSHistoryLoggingService.class);
+
+  private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
+      new LinkedBlockingQueue<DAGHistoryEvent>();
+
+  private Thread eventHandlingThread;
+  private AtomicBoolean stopped = new AtomicBoolean(false);
+  private int eventCounter = 0;
+  private int eventsProcessed = 0;
+  private final Object lock = new Object();
+
+  @VisibleForTesting
+  TimelineClient timelineClient;
+
+  private HashSet<TezDAGID> skippedDAGs = new HashSet<TezDAGID>();
+  private Map<TezDAGID, String> dagDomainIdMap = new HashMap<TezDAGID, String>();
+  private long maxTimeToWaitOnShutdown;
+  private boolean waitForeverOnShutdown = false;
+
+  private int maxEventsPerBatch;
+  private long maxPollingTimeMillis;
+
+  private String sessionDomainId;
+  private static final String atsHistoryACLManagerClassName =
+      "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager";
+  private HistoryACLPolicyManager historyACLPolicyManager;
+
+  public ATSHistoryLoggingService() {
+    super(ATSHistoryLoggingService.class.getName());
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    LOG.info("Initializing ATSService");
+    timelineClient = TimelineClient.createTimelineClient();
+    timelineClient.init(conf);
+    maxTimeToWaitOnShutdown = conf.getLong(
+        TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS,
+        TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS_DEFAULT);
+    maxEventsPerBatch = conf.getInt(
+        TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH,
+        TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH_DEFAULT);
+    maxPollingTimeMillis = conf.getInt(
+        TezConfiguration.YARN_ATS_MAX_POLLING_TIME_PER_EVENT,
+        TezConfiguration.YARN_ATS_MAX_POLLING_TIME_PER_EVENT_DEFAULT);
+    if (maxTimeToWaitOnShutdown < 0) {
+      waitForeverOnShutdown = true;
+    }
+    sessionDomainId = conf.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID);
+
+    LOG.info("Using " + atsHistoryACLManagerClassName + " to manage Timeline ACLs");
+    try {
+      historyACLPolicyManager = ReflectionUtils.createClazzInstance(
+          atsHistoryACLManagerClassName);
+      historyACLPolicyManager.setConf(conf);
+    } catch (TezUncheckedException e) {
+      LOG.warn("Could not instantiate object for " + atsHistoryACLManagerClassName
+          + ". ACLs cannot be enforced correctly for history data in Timeline", e);
+      if (!conf.getBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS,
+          TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS_DEFAULT)) {
+        throw e;
+      }
+      historyACLPolicyManager = null;
+    }
+
+  }
+
+  @Override
+  public void serviceStart() {
+    LOG.info("Starting ATSService");
+    timelineClient.start();
+
+    eventHandlingThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        List<DAGHistoryEvent> events = new LinkedList<DAGHistoryEvent>();
+        boolean interrupted = false;
+        while (!stopped.get() && !Thread.currentThread().isInterrupted()
+              && !interrupted) {
+
+          // Log the size of the event-queue every so often.
+          if (eventCounter != 0 && eventCounter % 1000 == 0) {
+            if (eventsProcessed != 0 && !events.isEmpty()) {
+              LOG.info("Event queue stats"
+                  + ", eventsProcessedSinceLastUpdate=" + eventsProcessed
+                  + ", eventQueueSize=" + eventQueue.size());
+            }
+            eventCounter = 0;
+            eventsProcessed = 0;
+          } else {
+            ++eventCounter;
+          }
+
+          synchronized (lock) {
+            try {
+              getEventBatch(events);
+            } catch (InterruptedException e) {
+              // Finish processing events and then return
+              interrupted = true;
+            }
+
+            if (events.isEmpty()) {
+              continue;
+            }
+
+            eventsProcessed += events.size();
+            try {
+              handleEvents(events);
+            } catch (Exception e) {
+              LOG.warn("Error handling events", e);
+            }
+          }
+        }
+      }
+    }, "HistoryEventHandlingThread");
+    eventHandlingThread.start();
+  }
+
+  @Override
+  public void serviceStop() {
+    LOG.info("Stopping ATSService"
+        + ", eventQueueBacklog=" + eventQueue.size());
+    stopped.set(true);
+    if (eventHandlingThread != null) {
+      eventHandlingThread.interrupt();
+    }
+    synchronized (lock) {
+      if (!eventQueue.isEmpty()) {
+        LOG.warn("ATSService being stopped"
+            + ", eventQueueBacklog=" + eventQueue.size()
+            + ", maxTimeLeftToFlush=" + maxTimeToWaitOnShutdown
+            + ", waitForever=" + waitForeverOnShutdown);
+        long startTime = appContext.getClock().getTime();
+        long endTime = startTime + maxTimeToWaitOnShutdown;
+        List<DAGHistoryEvent> events = new LinkedList<DAGHistoryEvent>();
+        while (waitForeverOnShutdown || (endTime >= appContext.getClock().getTime())) {
+          try {
+            getEventBatch(events);
+          } catch (InterruptedException e) {
+            LOG.info("ATSService interrupted while shutting down. Exiting."
+                  + " EventQueueBacklog=" + eventQueue.size());
+          }
+          if (events.isEmpty()) {
+            LOG.info("Event queue empty, stopping ATS Service");
+            break;
+          }
+          try {
+            handleEvents(events);
+          } catch (Exception e) {
+            LOG.warn("Error handling event", e);
+            break;
+          }
+        }
+      }
+    }
+    if (!eventQueue.isEmpty()) {
+      LOG.warn("Did not finish flushing eventQueue before stopping ATSService"
+          + ", eventQueueBacklog=" + eventQueue.size());
+    }
+    timelineClient.stop();
+  }
+
+  private void getEventBatch(List<DAGHistoryEvent> events) throws InterruptedException {
+    events.clear();
+    int counter = 0;
+    while (counter < maxEventsPerBatch) {
+      DAGHistoryEvent event = eventQueue.poll(maxPollingTimeMillis, TimeUnit.MILLISECONDS);
+      if (event == null) {
+        break;
+      }
+      if (!isValidEvent(event)) {
+        continue;
+      }
+      ++counter;
+      events.add(event);
+      if (event.getHistoryEvent().getEventType().equals(HistoryEventType.DAG_SUBMITTED)) {
+        // Special case this as it might be a large payload
+        break;
+      }
+    }
+  }
+
+
+  public void handle(DAGHistoryEvent event) {
+    eventQueue.add(event);
+  }
+
+  private boolean isValidEvent(DAGHistoryEvent event) {
+    HistoryEventType eventType = event.getHistoryEvent().getEventType();
+    TezDAGID dagId = event.getDagID();
+
+    if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
+      DAGSubmittedEvent dagSubmittedEvent =
+          (DAGSubmittedEvent) event.getHistoryEvent();
+      String dagName = dagSubmittedEvent.getDAGName();
+      if (dagName != null
+          && dagName.startsWith(
+          TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
+        // Skip recording pre-warm DAG events
+        skippedDAGs.add(dagId);
+        return false;
+      }
+      if (historyACLPolicyManager != null) {
+        String dagDomainId = dagSubmittedEvent.getConf().get(
+            TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID);
+        if (dagDomainId != null) {
+          dagDomainIdMap.put(dagId, dagDomainId);
+        }
+      }
+    }
+    if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
+      // Remove from set to keep size small
+      // No more events should be seen after this point.
+      if (skippedDAGs.remove(dagId)) {
+        return false;
+      }
+    }
+
+    if (dagId != null && skippedDAGs.contains(dagId)) {
+      // Skip pre-warm DAGs
+      return false;
+    }
+
+    return true;
+  }
+
+  private void handleEvents(List<DAGHistoryEvent> events) {
+    TimelineEntity[] entities = new TimelineEntity[events.size()];
+    for (int i = 0; i < events.size(); ++i) {
+      DAGHistoryEvent event = events.get(i);
+      String domainId = sessionDomainId;
+      TezDAGID dagId = event.getDagID();
+
+      if (historyACLPolicyManager != null && dagId != null) {
+        if (dagDomainIdMap.containsKey(dagId)) {
+          domainId = dagDomainIdMap.get(dagId);
+        }
+      }
+
+      entities[i] = HistoryEventTimelineConversion.convertToTimelineEntity(event.getHistoryEvent());
+      if (historyACLPolicyManager != null) {
+        if (domainId != null && !domainId.isEmpty()) {
+          historyACLPolicyManager.updateTimelineEntityDomain(entities[i], domainId);
+        }
+      }
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Sending event batch to Timeline, batchSize=" + events.size());
+    }
+    try {
+      TimelinePutResponse response =
+          timelineClient.putEntities(entities);
+      if (response != null
+        && !response.getErrors().isEmpty()) {
+        int count = response.getErrors().size();
+        for (int i = 0; i < count; ++i) {
+          TimelinePutError err = response.getErrors().get(i);
+          if (err.getErrorCode() != 0) {
+            LOG.warn("Could not post history event to ATS"
+                + ", atsPutError=" + err.getErrorCode()
+                + ", entityId=" + entities[i].getEntityId()
+                + ", eventType=" + events.get(i).getHistoryEvent().getEventType());
+          }
+        }
+      }
+      // Do nothing additional, ATS client library should handle throttling
+      // or auto-disable as needed
+    } catch (Exception e) {
+      LOG.warn("Could not handle history events", e);
+    }
+  }
+
+}
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
new file mode 100644
index 0000000..ca47b92
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -0,0 +1,631 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.ats;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.tez.common.ATSConstants;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.AMLaunchedEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.AppLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerStoppedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.history.logging.EntityTypes;
+import org.apache.tez.dag.history.utils.DAGUtils;
+import org.apache.tez.dag.records.TezVertexID;
+
+public class HistoryEventTimelineConversion {
+
+  public static TimelineEntity convertToTimelineEntity(HistoryEvent historyEvent) {
+    if (!historyEvent.isHistoryEvent()) {
+      throw new UnsupportedOperationException("Invalid Event, does not support history"
+          + ", eventType=" + historyEvent.getEventType());
+    }
+    TimelineEntity timelineEntity;
+    switch (historyEvent.getEventType()) {
+      case APP_LAUNCHED:
+        timelineEntity = convertAppLaunchedEvent((AppLaunchedEvent) historyEvent);
+        break;
+      case AM_LAUNCHED:
+        timelineEntity = convertAMLaunchedEvent((AMLaunchedEvent) historyEvent);
+        break;
+      case AM_STARTED:
+        timelineEntity = convertAMStartedEvent((AMStartedEvent) historyEvent);
+        break;
+      case CONTAINER_LAUNCHED:
+        timelineEntity = convertContainerLaunchedEvent((ContainerLaunchedEvent) historyEvent);
+        break;
+      case CONTAINER_STOPPED:
+        timelineEntity = convertContainerStoppedEvent((ContainerStoppedEvent) historyEvent);
+        break;
+      case DAG_SUBMITTED:
+        timelineEntity = convertDAGSubmittedEvent((DAGSubmittedEvent) historyEvent);
+        break;
+      case DAG_INITIALIZED:
+        timelineEntity = convertDAGInitializedEvent((DAGInitializedEvent) historyEvent);
+        break;
+      case DAG_STARTED:
+        timelineEntity = convertDAGStartedEvent((DAGStartedEvent) historyEvent);
+        break;
+      case DAG_FINISHED:
+        timelineEntity = convertDAGFinishedEvent((DAGFinishedEvent) historyEvent);
+        break;
+      case VERTEX_INITIALIZED:
+        timelineEntity = convertVertexInitializedEvent((VertexInitializedEvent) historyEvent);
+        break;
+      case VERTEX_STARTED:
+        timelineEntity = convertVertexStartedEvent((VertexStartedEvent) historyEvent);
+        break;
+      case VERTEX_FINISHED:
+        timelineEntity = convertVertexFinishedEvent((VertexFinishedEvent) historyEvent);
+      break;
+      case TASK_STARTED:
+        timelineEntity = convertTaskStartedEvent((TaskStartedEvent) historyEvent);
+        break;
+      case TASK_FINISHED:
+        timelineEntity = convertTaskFinishedEvent((TaskFinishedEvent) historyEvent);
+        break;
+      case TASK_ATTEMPT_STARTED:
+        timelineEntity = convertTaskAttemptStartedEvent((TaskAttemptStartedEvent) historyEvent);
+        break;
+      case TASK_ATTEMPT_FINISHED:
+        timelineEntity = convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent);
+        break;
+      case VERTEX_PARALLELISM_UPDATED:
+        timelineEntity = convertVertexParallelismUpdatedEvent(
+            (VertexParallelismUpdatedEvent) historyEvent);
+        break;
+      case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+      case VERTEX_COMMIT_STARTED:
+      case VERTEX_GROUP_COMMIT_STARTED:
+      case VERTEX_GROUP_COMMIT_FINISHED:
+      case DAG_COMMIT_STARTED:
+        throw new UnsupportedOperationException("Invalid Event, does not support history"
+            + ", eventType=" + historyEvent.getEventType());
+      default:
+        throw new UnsupportedOperationException("Unhandled Event"
+            + ", eventType=" + historyEvent.getEventType());
+    }
+    return timelineEntity;
+  }
+
+  private static TimelineEntity convertAppLaunchedEvent(AppLaunchedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId("tez_"
+        + event.getApplicationId().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_APPLICATION.name());
+
+    atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ID,
+        event.getApplicationId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.USER, event.getUser());
+
+    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+
+    atsEntity.addOtherInfo(ATSConstants.CONFIG,
+        DAGUtils.convertConfigurationToATSMap(event.getConf()));
+
+    atsEntity.setStartTime(event.getLaunchTime());
+
+    if (event.getVersion() != null) {
+      atsEntity.addOtherInfo(ATSConstants.TEZ_VERSION,
+          DAGUtils.convertTezVersionToATSMap(event.getVersion()));
+    }
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertAMLaunchedEvent(AMLaunchedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId("tez_"
+        + event.getApplicationAttemptId().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+    atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ID,
+        event.getApplicationAttemptId().getApplicationId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ATTEMPT_ID,
+        event.getApplicationAttemptId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.USER, event.getUser());
+
+    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getApplicationAttemptId().getApplicationId().toString());
+
+    atsEntity.setStartTime(event.getLaunchTime());
+
+    TimelineEvent launchEvt = new TimelineEvent();
+    launchEvt.setEventType(HistoryEventType.AM_LAUNCHED.name());
+    launchEvt.setTimestamp(event.getLaunchTime());
+    atsEntity.addEvent(launchEvt);
+
+    atsEntity.addOtherInfo(ATSConstants.APP_SUBMIT_TIME, event.getAppSubmitTime());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertAMStartedEvent(AMStartedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId("tez_"
+        + event.getApplicationAttemptId().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getApplicationAttemptId().getApplicationId().toString());
+
+    TimelineEvent startEvt = new TimelineEvent();
+    startEvt.setEventType(HistoryEventType.AM_STARTED.name());
+    startEvt.setTimestamp(event.getStartTime());
+    atsEntity.addEvent(startEvt);
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertContainerLaunchedEvent(ContainerLaunchedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId("tez_"
+        + event.getContainerId().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_CONTAINER_ID.name());
+
+    atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(),
+        "tez_" + event.getApplicationAttemptId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.CONTAINER_ID,
+        event.getContainerId().toString());
+
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getApplicationAttemptId().getApplicationId().toString());
+
+    atsEntity.setStartTime(event.getLaunchTime());
+
+    TimelineEvent launchEvt = new TimelineEvent();
+    launchEvt.setEventType(HistoryEventType.CONTAINER_LAUNCHED.name());
+    launchEvt.setTimestamp(event.getLaunchTime());
+    atsEntity.addEvent(launchEvt);
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertContainerStoppedEvent(ContainerStoppedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId("tez_"
+        + event.getContainerId().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_CONTAINER_ID.name());
+
+    // In case, a container is stopped in a different attempt
+    atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(),
+        "tez_" + event.getApplicationAttemptId().toString());
+
+    TimelineEvent stoppedEvt = new TimelineEvent();
+    stoppedEvt.setEventType(HistoryEventType.CONTAINER_STOPPED.name());
+    stoppedEvt.setTimestamp(event.getStoppedTime());
+    atsEntity.addEvent(stoppedEvt);
+
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getApplicationAttemptId().getApplicationId().toString());
+    atsEntity.addPrimaryFilter(ATSConstants.EXIT_STATUS, event.getExitStatus());
+
+    atsEntity.addOtherInfo(ATSConstants.EXIT_STATUS, event.getExitStatus());
+    atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getStoppedTime());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertDAGFinishedEvent(DAGFinishedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getDagID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name());
+
+    TimelineEvent finishEvt = new TimelineEvent();
+    finishEvt.setEventType(HistoryEventType.DAG_FINISHED.name());
+    finishEvt.setTimestamp(event.getFinishTime());
+    atsEntity.addEvent(finishEvt);
+
+    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getDagID().getApplicationId().toString());
+    atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName());
+    atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name());
+
+    atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+    atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
+    atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
+    atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+    atsEntity.addOtherInfo(ATSConstants.COUNTERS,
+        DAGUtils.convertCountersToATSMap(event.getTezCounters()));
+
+    final Map<String, Integer> dagTaskStats = event.getDagTaskStats();
+    if (dagTaskStats != null) {
+      for(Entry<String, Integer> entry : dagTaskStats.entrySet()) {
+        atsEntity.addOtherInfo(entry.getKey(), entry.getValue());
+      }
+    }
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertDAGInitializedEvent(DAGInitializedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getDagID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name());
+
+    TimelineEvent initEvt = new TimelineEvent();
+    initEvt.setEventType(HistoryEventType.DAG_INITIALIZED.name());
+    initEvt.setTimestamp(event.getInitTime());
+    atsEntity.addEvent(initEvt);
+
+    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getDagID().getApplicationId().toString());
+    atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName());
+
+    atsEntity.addOtherInfo(ATSConstants.INIT_TIME, event.getInitTime());
+
+    if (event.getVertexNameIDMap() != null) {
+      Map<String, String> nameIdStrMap = new TreeMap<String, String>();
+      for (Entry<String, TezVertexID> entry : event.getVertexNameIDMap().entrySet()) {
+        nameIdStrMap.put(entry.getKey(), entry.getValue().toString());
+      }
+      atsEntity.addOtherInfo(ATSConstants.VERTEX_NAME_ID_MAPPING, nameIdStrMap);
+    }
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertDAGStartedEvent(DAGStartedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getDagID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name());
+
+    TimelineEvent startEvt = new TimelineEvent();
+    startEvt.setEventType(HistoryEventType.DAG_STARTED.name());
+    startEvt.setTimestamp(event.getStartTime());
+    atsEntity.addEvent(startEvt);
+
+    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getDagID().getApplicationId().toString());
+    atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName());
+
+    atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getDagState().toString());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertDAGSubmittedEvent(DAGSubmittedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getDagID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name());
+
+    atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION.name(),
+        "tez_" + event.getApplicationAttemptId().getApplicationId().toString());
+    atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(),
+        "tez_" + event.getApplicationAttemptId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ID,
+        event.getApplicationAttemptId().getApplicationId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ATTEMPT_ID,
+        event.getApplicationAttemptId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.USER, event.getUser());
+
+    TimelineEvent submitEvt = new TimelineEvent();
+    submitEvt.setEventType(HistoryEventType.DAG_SUBMITTED.name());
+    submitEvt.setTimestamp(event.getSubmitTime());
+    atsEntity.addEvent(submitEvt);
+
+    atsEntity.setStartTime(event.getSubmitTime());
+
+    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+    atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDAGName());
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getDagID().getApplicationId().toString());
+
+    try {
+      atsEntity.addOtherInfo(ATSConstants.DAG_PLAN,
+          DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan()));
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
+    }
+    atsEntity.addOtherInfo(ATSConstants.APPLICATION_ID,
+        event.getApplicationAttemptId().getApplicationId().toString());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getTaskAttemptID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
+
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getApplicationId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(),
+        event.getTaskAttemptID().getTaskID().getVertexID().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_TASK_ID.name(),
+        event.getTaskAttemptID().getTaskID().toString());
+
+    TimelineEvent finishEvt = new TimelineEvent();
+    finishEvt.setEventType(HistoryEventType.TASK_ATTEMPT_FINISHED.name());
+    finishEvt.setTimestamp(event.getFinishTime());
+    atsEntity.addEvent(finishEvt);
+
+    atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name());
+
+    atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
+    atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
+    if (event.getTaskAttemptError() != null) {
+      atsEntity.addOtherInfo(ATSConstants.TASK_ATTEMPT_ERROR_ENUM, event.getTaskAttemptError().name());
+    }
+    atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+    atsEntity.addOtherInfo(ATSConstants.COUNTERS,
+        DAGUtils.convertCountersToATSMap(event.getCounters()));
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getTaskAttemptID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
+
+    atsEntity.setStartTime(event.getStartTime());
+
+    atsEntity.addRelatedEntity(ATSConstants.NODE_ID, event.getNodeId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.CONTAINER_ID, event.getContainerId().toString());
+    atsEntity.addRelatedEntity(EntityTypes.TEZ_TASK_ID.name(),
+        event.getTaskAttemptID().getTaskID().toString());
+
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getApplicationId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(),
+        event.getTaskAttemptID().getTaskID().getVertexID().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_TASK_ID.name(),
+        event.getTaskAttemptID().getTaskID().toString());
+
+    TimelineEvent startEvt = new TimelineEvent();
+    startEvt.setEventType(HistoryEventType.TASK_ATTEMPT_STARTED.name());
+    startEvt.setTimestamp(event.getStartTime());
+    atsEntity.addEvent(startEvt);
+
+    atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+    atsEntity.addOtherInfo(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl());
+    atsEntity.addOtherInfo(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl());
+    atsEntity.addOtherInfo(ATSConstants.NODE_ID, event.getNodeId().toString());
+    atsEntity.addOtherInfo(ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress());
+    atsEntity.addOtherInfo(ATSConstants.CONTAINER_ID, event.getContainerId().toString());
+    atsEntity.addOtherInfo(ATSConstants.STATUS, TaskAttemptState.RUNNING.name());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertTaskFinishedEvent(TaskFinishedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getTaskID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_TASK_ID.name());
+
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getTaskID().getVertexID().getDAGId().getApplicationId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getTaskID().getVertexID().getDAGId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(),
+        event.getTaskID().getVertexID().toString());
+
+    TimelineEvent finishEvt = new TimelineEvent();
+    finishEvt.setEventType(HistoryEventType.TASK_FINISHED.name());
+    finishEvt.setTimestamp(event.getFinishTime());
+    atsEntity.addEvent(finishEvt);
+
+    atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name());
+
+    atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
+    atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
+    if (event.getSuccessfulAttemptID() != null) {
+      atsEntity.addOtherInfo(ATSConstants.SUCCESSFUL_ATTEMPT_ID,
+          event.getSuccessfulAttemptID().toString());
+    }
+
+    atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+    atsEntity.addOtherInfo(ATSConstants.COUNTERS,
+        DAGUtils.convertCountersToATSMap(event.getTezCounters()));
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertTaskStartedEvent(TaskStartedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getTaskID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_TASK_ID.name());
+
+    atsEntity.addRelatedEntity(EntityTypes.TEZ_VERTEX_ID.name(),
+        event.getTaskID().getVertexID().toString());
+
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getTaskID().getVertexID().getDAGId().getApplicationId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getTaskID().getVertexID().getDAGId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(),
+        event.getTaskID().getVertexID().toString());
+
+    TimelineEvent startEvt = new TimelineEvent();
+    startEvt.setEventType(HistoryEventType.TASK_STARTED.name());
+    startEvt.setTimestamp(event.getStartTime());
+    atsEntity.addEvent(startEvt);
+
+    atsEntity.setStartTime(event.getStartTime());
+
+    atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+    atsEntity.addOtherInfo(ATSConstants.SCHEDULED_TIME, event.getScheduledTime());
+    atsEntity.addOtherInfo(ATSConstants.STATUS, TaskState.SCHEDULED.name());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertVertexFinishedEvent(VertexFinishedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getVertexID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name());
+
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getVertexID().getDAGId().getApplicationId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getVertexID().getDAGId().toString());
+
+    TimelineEvent finishEvt = new TimelineEvent();
+    finishEvt.setEventType(HistoryEventType.VERTEX_FINISHED.name());
+    finishEvt.setTimestamp(event.getFinishTime());
+    atsEntity.addEvent(finishEvt);
+
+    atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name());
+
+    atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
+    atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
+
+    atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+    atsEntity.addOtherInfo(ATSConstants.COUNTERS,
+        DAGUtils.convertCountersToATSMap(event.getTezCounters()));
+    atsEntity.addOtherInfo(ATSConstants.STATS,
+        DAGUtils.convertVertexStatsToATSMap(event.getVertexStats()));
+
+    final Map<String, Integer> vertexTaskStats = event.getVertexTaskStats();
+    if (vertexTaskStats != null) {
+      for(Entry<String, Integer> entry : vertexTaskStats.entrySet()) {
+        atsEntity.addOtherInfo(entry.getKey(), entry.getValue());
+      }
+    }
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertVertexInitializedEvent(VertexInitializedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getVertexID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name());
+
+    atsEntity.addRelatedEntity(EntityTypes.TEZ_DAG_ID.name(),
+        event.getVertexID().getDAGId().toString());
+
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getVertexID().getDAGId().getApplicationId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getVertexID().getDAGId().toString());
+
+    TimelineEvent initEvt = new TimelineEvent();
+    initEvt.setEventType(HistoryEventType.VERTEX_INITIALIZED.name());
+    initEvt.setTimestamp(event.getInitedTime());
+    atsEntity.addEvent(initEvt);
+
+    atsEntity.setStartTime(event.getInitedTime());
+
+    atsEntity.addOtherInfo(ATSConstants.VERTEX_NAME, event.getVertexName());
+    atsEntity.addOtherInfo(ATSConstants.INIT_REQUESTED_TIME, event.getInitRequestedTime());
+    atsEntity.addOtherInfo(ATSConstants.INIT_TIME, event.getInitedTime());
+    atsEntity.addOtherInfo(ATSConstants.NUM_TASKS, event.getNumTasks());
+    atsEntity.addOtherInfo(ATSConstants.PROCESSOR_CLASS_NAME, event.getProcessorName());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertVertexStartedEvent(VertexStartedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getVertexID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name());
+
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getVertexID().getDAGId().getApplicationId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getVertexID().getDAGId().toString());
+
+    TimelineEvent startEvt = new TimelineEvent();
+    startEvt.setEventType(HistoryEventType.VERTEX_STARTED.name());
+    startEvt.setTimestamp(event.getStartTime());
+    atsEntity.addEvent(startEvt);
+
+    atsEntity.addOtherInfo(ATSConstants.START_REQUESTED_TIME, event.getStartRequestedTime());
+    atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getVertexState().toString());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertVertexParallelismUpdatedEvent(
+      VertexParallelismUpdatedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getVertexID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name());
+
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getVertexID().getDAGId().getApplicationId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getVertexID().getDAGId().toString());
+
+    TimelineEvent updateEvt = new TimelineEvent();
+    updateEvt.setEventType(HistoryEventType.VERTEX_PARALLELISM_UPDATED.name());
+    updateEvt.setTimestamp(event.getUpdateTime());
+
+    Map<String,Object> eventInfo = new HashMap<String, Object>();
+    if (event.getSourceEdgeManagers() != null && !event.getSourceEdgeManagers().isEmpty()) {
+      Map<String, Object> updatedEdgeManagers = new HashMap<String, Object>();
+      for (Entry<String, EdgeManagerPluginDescriptor> entry :
+          event.getSourceEdgeManagers().entrySet()) {
+        updatedEdgeManagers.put(entry.getKey(),
+            DAGUtils.convertEdgeManagerPluginDescriptor(entry.getValue()));
+      }
+      eventInfo.put(ATSConstants.UPDATED_EDGE_MANAGERS, updatedEdgeManagers);
+    }
+    eventInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks());
+    eventInfo.put(ATSConstants.OLD_NUM_TASKS, event.getOldNumTasks());
+    updateEvt.setEventInfo(eventInfo);
+    atsEntity.addEvent(updateEvt);
+
+    atsEntity.addOtherInfo(ATSConstants.NUM_TASKS, event.getNumTasks());
+
+    return atsEntity;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/6692c514/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
new file mode 100644
index 0000000..c68d395
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.ats;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.common.security.HistoryACLPolicyManager;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.logging.HistoryLoggingService;
+import org.apache.tez.dag.records.TezDAGID;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class ATSHistoryLoggingService extends HistoryLoggingService {
+
+  private static final Log LOG = LogFactory.getLog(ATSHistoryLoggingService.class);
+
+  private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
+      new LinkedBlockingQueue<DAGHistoryEvent>();
+
+  private Thread eventHandlingThread;
+  private AtomicBoolean stopped = new AtomicBoolean(false);
+  private int eventCounter = 0;
+  private int eventsProcessed = 0;
+  private final Object lock = new Object();
+
+  @VisibleForTesting
+  TimelineClient timelineClient;
+
+  private HashSet<TezDAGID> skippedDAGs = new HashSet<TezDAGID>();
+  private Map<TezDAGID, String> dagDomainIdMap = new HashMap<TezDAGID, String>();
+  private long maxTimeToWaitOnShutdown;
+  private boolean waitForeverOnShutdown = false;
+
+  private int maxEventsPerBatch;
+  private long maxPollingTimeMillis;
+
+  private String sessionDomainId;
+  private static final String atsHistoryACLManagerClassName =
+      "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager";
+  private HistoryACLPolicyManager historyACLPolicyManager;
+
+  public ATSHistoryLoggingService() {
+    super(ATSHistoryLoggingService.class.getName());
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    LOG.info("Initializing ATSService");
+    timelineClient = TimelineClient.createTimelineClient();
+    timelineClient.init(conf);
+    maxTimeToWaitOnShutdown = conf.getLong(
+        TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS,
+        TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS_DEFAULT);
+    maxEventsPerBatch = conf.getInt(
+        TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH,
+        TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH_DEFAULT);
+    maxPollingTimeMillis = conf.getInt(
+        TezConfiguration.YARN_ATS_MAX_POLLING_TIME_PER_EVENT,
+        TezConfiguration.YARN_ATS_MAX_POLLING_TIME_PER_EVENT_DEFAULT);
+    if (maxTimeToWaitOnShutdown < 0) {
+      waitForeverOnShutdown = true;
+    }
+    sessionDomainId = conf.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID);
+
+    LOG.info("Using " + atsHistoryACLManagerClassName + " to manage Timeline ACLs");
+    try {
+      historyACLPolicyManager = ReflectionUtils.createClazzInstance(
+          atsHistoryACLManagerClassName);
+      historyACLPolicyManager.setConf(conf);
+    } catch (TezUncheckedException e) {
+      LOG.warn("Could not instantiate object for " + atsHistoryACLManagerClassName
+          + ". ACLs cannot be enforced correctly for history data in Timeline", e);
+      if (!conf.getBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS,
+          TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS_DEFAULT)) {
+        throw e;
+      }
+      historyACLPolicyManager = null;
+    }
+
+  }
+
+  @Override
+  public void serviceStart() {
+    LOG.info("Starting ATSService");
+    timelineClient.start();
+
+    eventHandlingThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        List<DAGHistoryEvent> events = new LinkedList<DAGHistoryEvent>();
+        boolean interrupted = false;
+        while (!stopped.get() && !Thread.currentThread().isInterrupted()
+              && !interrupted) {
+
+          // Log the size of the event-queue every so often.
+          if (eventCounter != 0 && eventCounter % 1000 == 0) {
+            if (eventsProcessed != 0 && !events.isEmpty()) {
+              LOG.info("Event queue stats"
+                  + ", eventsProcessedSinceLastUpdate=" + eventsProcessed
+                  + ", eventQueueSize=" + eventQueue.size());
+            }
+            eventCounter = 0;
+            eventsProcessed = 0;
+          } else {
+            ++eventCounter;
+          }
+
+          synchronized (lock) {
+            try {
+              getEventBatch(events);
+            } catch (InterruptedException e) {
+              // Finish processing events and then return
+              interrupted = true;
+            }
+
+            if (events.isEmpty()) {
+              continue;
+            }
+
+            eventsProcessed += events.size();
+            try {
+              handleEvents(events);
+            } catch (Exception e) {
+              LOG.warn("Error handling events", e);
+            }
+          }
+        }
+      }
+    }, "HistoryEventHandlingThread");
+    eventHandlingThread.start();
+  }
+
+  @Override
+  public void serviceStop() {
+    LOG.info("Stopping ATSService"
+        + ", eventQueueBacklog=" + eventQueue.size());
+    stopped.set(true);
+    if (eventHandlingThread != null) {
+      eventHandlingThread.interrupt();
+    }
+    synchronized (lock) {
+      if (!eventQueue.isEmpty()) {
+        LOG.warn("ATSService being stopped"
+            + ", eventQueueBacklog=" + eventQueue.size()
+            + ", maxTimeLeftToFlush=" + maxTimeToWaitOnShutdown
+            + ", waitForever=" + waitForeverOnShutdown);
+        long startTime = appContext.getClock().getTime();
+        long endTime = startTime + maxTimeToWaitOnShutdown;
+        List<DAGHistoryEvent> events = new LinkedList<DAGHistoryEvent>();
+        while (waitForeverOnShutdown || (endTime >= appContext.getClock().getTime())) {
+          try {
+            getEventBatch(events);
+          } catch (InterruptedException e) {
+            LOG.info("ATSService interrupted while shutting down. Exiting."
+                  + " EventQueueBacklog=" + eventQueue.size());
+          }
+          if (events.isEmpty()) {
+            LOG.info("Event queue empty, stopping ATS Service");
+            break;
+          }
+          try {
+            handleEvents(events);
+          } catch (Exception e) {
+            LOG.warn("Error handling event", e);
+            break;
+          }
+        }
+      }
+    }
+    if (!eventQueue.isEmpty()) {
+      LOG.warn("Did not finish flushing eventQueue before stopping ATSService"
+          + ", eventQueueBacklog=" + eventQueue.size());
+    }
+    timelineClient.stop();
+  }
+
+  private void getEventBatch(List<DAGHistoryEvent> events) throws InterruptedException {
+    events.clear();
+    int counter = 0;
+    while (counter < maxEventsPerBatch) {
+      DAGHistoryEvent event = eventQueue.poll(maxPollingTimeMillis, TimeUnit.MILLISECONDS);
+      if (event == null) {
+        break;
+      }
+      if (!isValidEvent(event)) {
+        continue;
+      }
+      ++counter;
+      events.add(event);
+      if (event.getHistoryEvent().getEventType().equals(HistoryEventType.DAG_SUBMITTED)) {
+        // Special case this as it might be a large payload
+        break;
+      }
+    }
+  }
+
+
+  public void handle(DAGHistoryEvent event) {
+    eventQueue.add(event);
+  }
+
+  private boolean isValidEvent(DAGHistoryEvent event) {
+    HistoryEventType eventType = event.getHistoryEvent().getEventType();
+    TezDAGID dagId = event.getDagID();
+
+    if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
+      DAGSubmittedEvent dagSubmittedEvent =
+          (DAGSubmittedEvent) event.getHistoryEvent();
+      String dagName = dagSubmittedEvent.getDAGName();
+      if (dagName != null
+          && dagName.startsWith(
+          TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
+        // Skip recording pre-warm DAG events
+        skippedDAGs.add(dagId);
+        return false;
+      }
+      if (historyACLPolicyManager != null) {
+        String dagDomainId = dagSubmittedEvent.getConf().get(
+            TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID);
+        if (dagDomainId != null) {
+          dagDomainIdMap.put(dagId, dagDomainId);
+        }
+      }
+    }
+    if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
+      // Remove from set to keep size small
+      // No more events should be seen after this point.
+      if (skippedDAGs.remove(dagId)) {
+        return false;
+      }
+    }
+
+    if (dagId != null && skippedDAGs.contains(dagId)) {
+      // Skip pre-warm DAGs
+      return false;
+    }
+
+    return true;
+  }
+
+  private void handleEvents(List<DAGHistoryEvent> events) {
+    TimelineEntity[] entities = new TimelineEntity[events.size()];
+    for (int i = 0; i < events.size(); ++i) {
+      DAGHistoryEvent event = events.get(i);
+      String domainId = sessionDomainId;
+      TezDAGID dagId = event.getDagID();
+
+      if (historyACLPolicyManager != null && dagId != null) {
+        if (dagDomainIdMap.containsKey(dagId)) {
+          domainId = dagDomainIdMap.get(dagId);
+        }
+      }
+
+      entities[i] = HistoryEventTimelineConversion.convertToTimelineEntity(event.getHistoryEvent());
+      if (historyACLPolicyManager != null) {
+        if (domainId != null && !domainId.isEmpty()) {
+          historyACLPolicyManager.updateTimelineEntityDomain(entities[i], domainId);
+        }
+      }
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Sending event batch to Timeline, batchSize=" + events.size());
+    }
+    try {
+      TimelinePutResponse response =
+          timelineClient.putEntities(entities);
+      if (response != null
+        && !response.getErrors().isEmpty()) {
+        int count = response.getErrors().size();
+        for (int i = 0; i < count; ++i) {
+          TimelinePutError err = response.getErrors().get(i);
+          if (err.getErrorCode() != 0) {
+            LOG.warn("Could not post history event to ATS"
+                + ", atsPutError=" + err.getErrorCode()
+                + ", entityId=" + entities[i].getEntityId()
+                + ", eventType=" + events.get(i).getHistoryEvent().getEventType());
+          }
+        }
+      }
+      // Do nothing additional, ATS client library should handle throttling
+      // or auto-disable as needed
+    } catch (Exception e) {
+      LOG.warn("Could not handle history events", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/6692c514/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
new file mode 100644
index 0000000..ca47b92
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -0,0 +1,631 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.ats;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.tez.common.ATSConstants;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.AMLaunchedEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.AppLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerStoppedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.history.logging.EntityTypes;
+import org.apache.tez.dag.history.utils.DAGUtils;
+import org.apache.tez.dag.records.TezVertexID;
+
+public class HistoryEventTimelineConversion {
+
+  public static TimelineEntity convertToTimelineEntity(HistoryEvent historyEvent) {
+    if (!historyEvent.isHistoryEvent()) {
+      throw new UnsupportedOperationException("Invalid Event, does not support history"
+          + ", eventType=" + historyEvent.getEventType());
+    }
+    TimelineEntity timelineEntity;
+    switch (historyEvent.getEventType()) {
+      case APP_LAUNCHED:
+        timelineEntity = convertAppLaunchedEvent((AppLaunchedEvent) historyEvent);
+        break;
+      case AM_LAUNCHED:
+        timelineEntity = convertAMLaunchedEvent((AMLaunchedEvent) historyEvent);
+        break;
+      case AM_STARTED:
+        timelineEntity = convertAMStartedEvent((AMStartedEvent) historyEvent);
+        break;
+      case CONTAINER_LAUNCHED:
+        timelineEntity = convertContainerLaunchedEvent((ContainerLaunchedEvent) historyEvent);
+        break;
+      case CONTAINER_STOPPED:
+        timelineEntity = convertContainerStoppedEvent((ContainerStoppedEvent) historyEvent);
+        break;
+      case DAG_SUBMITTED:
+        timelineEntity = convertDAGSubmittedEvent((DAGSubmittedEvent) historyEvent);
+        break;
+      case DAG_INITIALIZED:
+        timelineEntity = convertDAGInitializedEvent((DAGInitializedEvent) historyEvent);
+        break;
+      case DAG_STARTED:
+        timelineEntity = convertDAGStartedEvent((DAGStartedEvent) historyEvent);
+        break;
+      case DAG_FINISHED:
+        timelineEntity = convertDAGFinishedEvent((DAGFinishedEvent) historyEvent);
+        break;
+      case VERTEX_INITIALIZED:
+        timelineEntity = convertVertexInitializedEvent((VertexInitializedEvent) historyEvent);
+        break;
+      case VERTEX_STARTED:
+        timelineEntity = convertVertexStartedEvent((VertexStartedEvent) historyEvent);
+        break;
+      case VERTEX_FINISHED:
+        timelineEntity = convertVertexFinishedEvent((VertexFinishedEvent) historyEvent);
+      break;
+      case TASK_STARTED:
+        timelineEntity = convertTaskStartedEvent((TaskStartedEvent) historyEvent);
+        break;
+      case TASK_FINISHED:
+        timelineEntity = convertTaskFinishedEvent((TaskFinishedEvent) historyEvent);
+        break;
+      case TASK_ATTEMPT_STARTED:
+        timelineEntity = convertTaskAttemptStartedEvent((TaskAttemptStartedEvent) historyEvent);
+        break;
+      case TASK_ATTEMPT_FINISHED:
+        timelineEntity = convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent);
+        break;
+      case VERTEX_PARALLELISM_UPDATED:
+        timelineEntity = convertVertexParallelismUpdatedEvent(
+            (VertexParallelismUpdatedEvent) historyEvent);
+        break;
+      case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+      case VERTEX_COMMIT_STARTED:
+      case VERTEX_GROUP_COMMIT_STARTED:
+      case VERTEX_GROUP_COMMIT_FINISHED:
+      case DAG_COMMIT_STARTED:
+        throw new UnsupportedOperationException("Invalid Event, does not support history"
+            + ", eventType=" + historyEvent.getEventType());
+      default:
+        throw new UnsupportedOperationException("Unhandled Event"
+            + ", eventType=" + historyEvent.getEventType());
+    }
+    return timelineEntity;
+  }
+
+  private static TimelineEntity convertAppLaunchedEvent(AppLaunchedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId("tez_"
+        + event.getApplicationId().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_APPLICATION.name());
+
+    atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ID,
+        event.getApplicationId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.USER, event.getUser());
+
+    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+
+    atsEntity.addOtherInfo(ATSConstants.CONFIG,
+        DAGUtils.convertConfigurationToATSMap(event.getConf()));
+
+    atsEntity.setStartTime(event.getLaunchTime());
+
+    if (event.getVersion() != null) {
+      atsEntity.addOtherInfo(ATSConstants.TEZ_VERSION,
+          DAGUtils.convertTezVersionToATSMap(event.getVersion()));
+    }
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertAMLaunchedEvent(AMLaunchedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId("tez_"
+        + event.getApplicationAttemptId().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+    atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ID,
+        event.getApplicationAttemptId().getApplicationId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ATTEMPT_ID,
+        event.getApplicationAttemptId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.USER, event.getUser());
+
+    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getApplicationAttemptId().getApplicationId().toString());
+
+    atsEntity.setStartTime(event.getLaunchTime());
+
+    TimelineEvent launchEvt = new TimelineEvent();
+    launchEvt.setEventType(HistoryEventType.AM_LAUNCHED.name());
+    launchEvt.setTimestamp(event.getLaunchTime());
+    atsEntity.addEvent(launchEvt);
+
+    atsEntity.addOtherInfo(ATSConstants.APP_SUBMIT_TIME, event.getAppSubmitTime());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertAMStartedEvent(AMStartedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId("tez_"
+        + event.getApplicationAttemptId().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getApplicationAttemptId().getApplicationId().toString());
+
+    TimelineEvent startEvt = new TimelineEvent();
+    startEvt.setEventType(HistoryEventType.AM_STARTED.name());
+    startEvt.setTimestamp(event.getStartTime());
+    atsEntity.addEvent(startEvt);
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertContainerLaunchedEvent(ContainerLaunchedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId("tez_"
+        + event.getContainerId().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_CONTAINER_ID.name());
+
+    atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(),
+        "tez_" + event.getApplicationAttemptId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.CONTAINER_ID,
+        event.getContainerId().toString());
+
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getApplicationAttemptId().getApplicationId().toString());
+
+    atsEntity.setStartTime(event.getLaunchTime());
+
+    TimelineEvent launchEvt = new TimelineEvent();
+    launchEvt.setEventType(HistoryEventType.CONTAINER_LAUNCHED.name());
+    launchEvt.setTimestamp(event.getLaunchTime());
+    atsEntity.addEvent(launchEvt);
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertContainerStoppedEvent(ContainerStoppedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId("tez_"
+        + event.getContainerId().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_CONTAINER_ID.name());
+
+    // In case, a container is stopped in a different attempt
+    atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(),
+        "tez_" + event.getApplicationAttemptId().toString());
+
+    TimelineEvent stoppedEvt = new TimelineEvent();
+    stoppedEvt.setEventType(HistoryEventType.CONTAINER_STOPPED.name());
+    stoppedEvt.setTimestamp(event.getStoppedTime());
+    atsEntity.addEvent(stoppedEvt);
+
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getApplicationAttemptId().getApplicationId().toString());
+    atsEntity.addPrimaryFilter(ATSConstants.EXIT_STATUS, event.getExitStatus());
+
+    atsEntity.addOtherInfo(ATSConstants.EXIT_STATUS, event.getExitStatus());
+    atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getStoppedTime());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertDAGFinishedEvent(DAGFinishedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getDagID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name());
+
+    TimelineEvent finishEvt = new TimelineEvent();
+    finishEvt.setEventType(HistoryEventType.DAG_FINISHED.name());
+    finishEvt.setTimestamp(event.getFinishTime());
+    atsEntity.addEvent(finishEvt);
+
+    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getDagID().getApplicationId().toString());
+    atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName());
+    atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name());
+
+    atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+    atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
+    atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
+    atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+    atsEntity.addOtherInfo(ATSConstants.COUNTERS,
+        DAGUtils.convertCountersToATSMap(event.getTezCounters()));
+
+    final Map<String, Integer> dagTaskStats = event.getDagTaskStats();
+    if (dagTaskStats != null) {
+      for(Entry<String, Integer> entry : dagTaskStats.entrySet()) {
+        atsEntity.addOtherInfo(entry.getKey(), entry.getValue());
+      }
+    }
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertDAGInitializedEvent(DAGInitializedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getDagID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name());
+
+    TimelineEvent initEvt = new TimelineEvent();
+    initEvt.setEventType(HistoryEventType.DAG_INITIALIZED.name());
+    initEvt.setTimestamp(event.getInitTime());
+    atsEntity.addEvent(initEvt);
+
+    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getDagID().getApplicationId().toString());
+    atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName());
+
+    atsEntity.addOtherInfo(ATSConstants.INIT_TIME, event.getInitTime());
+
+    if (event.getVertexNameIDMap() != null) {
+      Map<String, String> nameIdStrMap = new TreeMap<String, String>();
+      for (Entry<String, TezVertexID> entry : event.getVertexNameIDMap().entrySet()) {
+        nameIdStrMap.put(entry.getKey(), entry.getValue().toString());
+      }
+      atsEntity.addOtherInfo(ATSConstants.VERTEX_NAME_ID_MAPPING, nameIdStrMap);
+    }
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertDAGStartedEvent(DAGStartedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getDagID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name());
+
+    TimelineEvent startEvt = new TimelineEvent();
+    startEvt.setEventType(HistoryEventType.DAG_STARTED.name());
+    startEvt.setTimestamp(event.getStartTime());
+    atsEntity.addEvent(startEvt);
+
+    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getDagID().getApplicationId().toString());
+    atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName());
+
+    atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getDagState().toString());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertDAGSubmittedEvent(DAGSubmittedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getDagID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name());
+
+    atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION.name(),
+        "tez_" + event.getApplicationAttemptId().getApplicationId().toString());
+    atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(),
+        "tez_" + event.getApplicationAttemptId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ID,
+        event.getApplicationAttemptId().getApplicationId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ATTEMPT_ID,
+        event.getApplicationAttemptId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.USER, event.getUser());
+
+    TimelineEvent submitEvt = new TimelineEvent();
+    submitEvt.setEventType(HistoryEventType.DAG_SUBMITTED.name());
+    submitEvt.setTimestamp(event.getSubmitTime());
+    atsEntity.addEvent(submitEvt);
+
+    atsEntity.setStartTime(event.getSubmitTime());
+
+    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+    atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDAGName());
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getDagID().getApplicationId().toString());
+
+    try {
+      atsEntity.addOtherInfo(ATSConstants.DAG_PLAN,
+          DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan()));
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
+    }
+    atsEntity.addOtherInfo(ATSConstants.APPLICATION_ID,
+        event.getApplicationAttemptId().getApplicationId().toString());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getTaskAttemptID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
+
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getApplicationId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(),
+        event.getTaskAttemptID().getTaskID().getVertexID().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_TASK_ID.name(),
+        event.getTaskAttemptID().getTaskID().toString());
+
+    TimelineEvent finishEvt = new TimelineEvent();
+    finishEvt.setEventType(HistoryEventType.TASK_ATTEMPT_FINISHED.name());
+    finishEvt.setTimestamp(event.getFinishTime());
+    atsEntity.addEvent(finishEvt);
+
+    atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name());
+
+    atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
+    atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
+    if (event.getTaskAttemptError() != null) {
+      atsEntity.addOtherInfo(ATSConstants.TASK_ATTEMPT_ERROR_ENUM, event.getTaskAttemptError().name());
+    }
+    atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+    atsEntity.addOtherInfo(ATSConstants.COUNTERS,
+        DAGUtils.convertCountersToATSMap(event.getCounters()));
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getTaskAttemptID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
+
+    atsEntity.setStartTime(event.getStartTime());
+
+    atsEntity.addRelatedEntity(ATSConstants.NODE_ID, event.getNodeId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.CONTAINER_ID, event.getContainerId().toString());
+    atsEntity.addRelatedEntity(EntityTypes.TEZ_TASK_ID.name(),
+        event.getTaskAttemptID().getTaskID().toString());
+
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getApplicationId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(),
+        event.getTaskAttemptID().getTaskID().getVertexID().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_TASK_ID.name(),
+        event.getTaskAttemptID().getTaskID().toString());
+
+    TimelineEvent startEvt = new TimelineEvent();
+    startEvt.setEventType(HistoryEventType.TASK_ATTEMPT_STARTED.name());
+    startEvt.setTimestamp(event.getStartTime());
+    atsEntity.addEvent(startEvt);
+
+    atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+    atsEntity.addOtherInfo(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl());
+    atsEntity.addOtherInfo(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl());
+    atsEntity.addOtherInfo(ATSConstants.NODE_ID, event.getNodeId().toString());
+    atsEntity.addOtherInfo(ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress());
+    atsEntity.addOtherInfo(ATSConstants.CONTAINER_ID, event.getContainerId().toString());
+    atsEntity.addOtherInfo(ATSConstants.STATUS, TaskAttemptState.RUNNING.name());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertTaskFinishedEvent(TaskFinishedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getTaskID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_TASK_ID.name());
+
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getTaskID().getVertexID().getDAGId().getApplicationId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getTaskID().getVertexID().getDAGId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(),
+        event.getTaskID().getVertexID().toString());
+
+    TimelineEvent finishEvt = new TimelineEvent();
+    finishEvt.setEventType(HistoryEventType.TASK_FINISHED.name());
+    finishEvt.setTimestamp(event.getFinishTime());
+    atsEntity.addEvent(finishEvt);
+
+    atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name());
+
+    atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
+    atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
+    if (event.getSuccessfulAttemptID() != null) {
+      atsEntity.addOtherInfo(ATSConstants.SUCCESSFUL_ATTEMPT_ID,
+          event.getSuccessfulAttemptID().toString());
+    }
+
+    atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+    atsEntity.addOtherInfo(ATSConstants.COUNTERS,
+        DAGUtils.convertCountersToATSMap(event.getTezCounters()));
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertTaskStartedEvent(TaskStartedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getTaskID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_TASK_ID.name());
+
+    atsEntity.addRelatedEntity(EntityTypes.TEZ_VERTEX_ID.name(),
+        event.getTaskID().getVertexID().toString());
+
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getTaskID().getVertexID().getDAGId().getApplicationId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getTaskID().getVertexID().getDAGId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(),
+        event.getTaskID().getVertexID().toString());
+
+    TimelineEvent startEvt = new TimelineEvent();
+    startEvt.setEventType(HistoryEventType.TASK_STARTED.name());
+    startEvt.setTimestamp(event.getStartTime());
+    atsEntity.addEvent(startEvt);
+
+    atsEntity.setStartTime(event.getStartTime());
+
+    atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+    atsEntity.addOtherInfo(ATSConstants.SCHEDULED_TIME, event.getScheduledTime());
+    atsEntity.addOtherInfo(ATSConstants.STATUS, TaskState.SCHEDULED.name());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertVertexFinishedEvent(VertexFinishedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getVertexID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name());
+
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getVertexID().getDAGId().getApplicationId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getVertexID().getDAGId().toString());
+
+    TimelineEvent finishEvt = new TimelineEvent();
+    finishEvt.setEventType(HistoryEventType.VERTEX_FINISHED.name());
+    finishEvt.setTimestamp(event.getFinishTime());
+    atsEntity.addEvent(finishEvt);
+
+    atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name());
+
+    atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
+    atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
+
+    atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+    atsEntity.addOtherInfo(ATSConstants.COUNTERS,
+        DAGUtils.convertCountersToATSMap(event.getTezCounters()));
+    atsEntity.addOtherInfo(ATSConstants.STATS,
+        DAGUtils.convertVertexStatsToATSMap(event.getVertexStats()));
+
+    final Map<String, Integer> vertexTaskStats = event.getVertexTaskStats();
+    if (vertexTaskStats != null) {
+      for(Entry<String, Integer> entry : vertexTaskStats.entrySet()) {
+        atsEntity.addOtherInfo(entry.getKey(), entry.getValue());
+      }
+    }
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertVertexInitializedEvent(VertexInitializedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getVertexID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name());
+
+    atsEntity.addRelatedEntity(EntityTypes.TEZ_DAG_ID.name(),
+        event.getVertexID().getDAGId().toString());
+
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getVertexID().getDAGId().getApplicationId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getVertexID().getDAGId().toString());
+
+    TimelineEvent initEvt = new TimelineEvent();
+    initEvt.setEventType(HistoryEventType.VERTEX_INITIALIZED.name());
+    initEvt.setTimestamp(event.getInitedTime());
+    atsEntity.addEvent(initEvt);
+
+    atsEntity.setStartTime(event.getInitedTime());
+
+    atsEntity.addOtherInfo(ATSConstants.VERTEX_NAME, event.getVertexName());
+    atsEntity.addOtherInfo(ATSConstants.INIT_REQUESTED_TIME, event.getInitRequestedTime());
+    atsEntity.addOtherInfo(ATSConstants.INIT_TIME, event.getInitedTime());
+    atsEntity.addOtherInfo(ATSConstants.NUM_TASKS, event.getNumTasks());
+    atsEntity.addOtherInfo(ATSConstants.PROCESSOR_CLASS_NAME, event.getProcessorName());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertVertexStartedEvent(VertexStartedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getVertexID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name());
+
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getVertexID().getDAGId().getApplicationId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getVertexID().getDAGId().toString());
+
+    TimelineEvent startEvt = new TimelineEvent();
+    startEvt.setEventType(HistoryEventType.VERTEX_STARTED.name());
+    startEvt.setTimestamp(event.getStartTime());
+    atsEntity.addEvent(startEvt);
+
+    atsEntity.addOtherInfo(ATSConstants.START_REQUESTED_TIME, event.getStartRequestedTime());
+    atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getVertexState().toString());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertVertexParallelismUpdatedEvent(
+      VertexParallelismUpdatedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getVertexID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name());
+
+    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
+        event.getVertexID().getDAGId().getApplicationId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getVertexID().getDAGId().toString());
+
+    TimelineEvent updateEvt = new TimelineEvent();
+    updateEvt.setEventType(HistoryEventType.VERTEX_PARALLELISM_UPDATED.name());
+    updateEvt.setTimestamp(event.getUpdateTime());
+
+    Map<String,Object> eventInfo = new HashMap<String, Object>();
+    if (event.getSourceEdgeManagers() != null && !event.getSourceEdgeManagers().isEmpty()) {
+      Map<String, Object> updatedEdgeManagers = new HashMap<String, Object>();
+      for (Entry<String, EdgeManagerPluginDescriptor> entry :
+          event.getSourceEdgeManagers().entrySet()) {
+        updatedEdgeManagers.put(entry.getKey(),
+            DAGUtils.convertEdgeManagerPluginDescriptor(entry.getValue()));
+      }
+      eventInfo.put(ATSConstants.UPDATED_EDGE_MANAGERS, updatedEdgeManagers);
+    }
+    eventInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks());
+    eventInfo.put(ATSConstants.OLD_NUM_TASKS, event.getOldNumTasks());
+    updateEvt.setEventInfo(eventInfo);
+    atsEntity.addEvent(updateEvt);
+
+    atsEntity.addOtherInfo(ATSConstants.NUM_TASKS, event.getNumTasks());
+
+    return atsEntity;
+  }
+
+}


Mime
View raw message