tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject [5/6] tez git commit: TEZ-2168. Fix application dependencies on mutually exclusive artifacts: tez-yarn-timeline-history and tez-yarn-timeline-history-with-acls. (hitesh)
Date Wed, 11 Mar 2015 04:42:42 GMT
TEZ-2168. Fix application dependencies on mutually exclusive artifacts: tez-yarn-timeline-history and tez-yarn-timeline-history-with-acls. (hitesh)

(cherry picked from commit 6692c514efa2b8bdc1f19bead9dedc3d1bb3d88b)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3629dbe8
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3629dbe8
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3629dbe8

Branch: refs/heads/branch-0.6
Commit: 3629dbe86583b3e2738652949b9050b83c42606e
Parents: 200b42b
Author: Hitesh Shah <hitesh@apache.org>
Authored: Tue Mar 10 21:37:39 2015 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Tue Mar 10 21:42:44 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../logging/ats/ATSHistoryLoggingService.java   | 325 -------
 .../ats/HistoryEventTimelineConversion.java     | 631 --------------
 .../ats/TestATSHistoryLoggingService.java       | 145 ----
 .../ats/TestATSHistoryWithMiniCluster.java      | 242 ------
 .../ats/TestHistoryEventTimelineConversion.java | 850 -------------------
 .../tez/tests/MiniTezClusterWithTimeline.java   | 253 ------
 .../java/org/apache/tez/dag/history/logging/ats |   1 -
 .../logging/ats/ATSHistoryLoggingService.java   | 325 +++++++
 .../ats/HistoryEventTimelineConversion.java     | 631 ++++++++++++++
 .../java/org/apache/tez/dag/history/logging/ats |   1 -
 .../ats/TestATSHistoryLoggingService.java       | 145 ++++
 .../ats/TestATSHistoryWithMiniCluster.java      | 242 ++++++
 .../ats/TestHistoryEventTimelineConversion.java | 850 +++++++++++++++++++
 .../src/test/java/org/apache/tez/tests          |   1 -
 .../tez/tests/MiniTezClusterWithTimeline.java   | 253 ++++++
 16 files changed, 2448 insertions(+), 2449 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/3629dbe8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2a05b0a..ea4b0d5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,8 @@ Release 0.6.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2168. Fix application dependencies on mutually exclusive artifacts: tez-yarn-timeline-history
+    and tez-yarn-timeline-history-with-acls.
   TEZ-2190. TestOrderedWordCount fails when generateSplitsInClient set to true.
   TEZ-2091. Add support for hosting TEZ_UI with nodejs.
   TEZ-2165. Tez UI: DAG shows running status if killed by RM in some cases.

http://git-wip-us.apache.org/repos/asf/tez/blob/3629dbe8/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
deleted file mode 100644
index c68d395..0000000
--- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.logging.ats;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
-import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
-import org.apache.tez.common.ReflectionUtils;
-import org.apache.tez.common.security.HistoryACLPolicyManager;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.history.DAGHistoryEvent;
-import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.events.DAGSubmittedEvent;
-import org.apache.tez.dag.history.logging.HistoryLoggingService;
-import org.apache.tez.dag.records.TezDAGID;
-
-import com.google.common.annotations.VisibleForTesting;
-
-public class ATSHistoryLoggingService extends HistoryLoggingService {
-
-  private static final Log LOG = LogFactory.getLog(ATSHistoryLoggingService.class);
-
-  private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
-      new LinkedBlockingQueue<DAGHistoryEvent>();
-
-  private Thread eventHandlingThread;
-  private AtomicBoolean stopped = new AtomicBoolean(false);
-  private int eventCounter = 0;
-  private int eventsProcessed = 0;
-  private final Object lock = new Object();
-
-  @VisibleForTesting
-  TimelineClient timelineClient;
-
-  private HashSet<TezDAGID> skippedDAGs = new HashSet<TezDAGID>();
-  private Map<TezDAGID, String> dagDomainIdMap = new HashMap<TezDAGID, String>();
-  private long maxTimeToWaitOnShutdown;
-  private boolean waitForeverOnShutdown = false;
-
-  private int maxEventsPerBatch;
-  private long maxPollingTimeMillis;
-
-  private String sessionDomainId;
-  private static final String atsHistoryACLManagerClassName =
-      "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager";
-  private HistoryACLPolicyManager historyACLPolicyManager;
-
-  public ATSHistoryLoggingService() {
-    super(ATSHistoryLoggingService.class.getName());
-  }
-
-  @Override
-  public void serviceInit(Configuration conf) throws Exception {
-    LOG.info("Initializing ATSService");
-    timelineClient = TimelineClient.createTimelineClient();
-    timelineClient.init(conf);
-    maxTimeToWaitOnShutdown = conf.getLong(
-        TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS,
-        TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS_DEFAULT);
-    maxEventsPerBatch = conf.getInt(
-        TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH,
-        TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH_DEFAULT);
-    maxPollingTimeMillis = conf.getInt(
-        TezConfiguration.YARN_ATS_MAX_POLLING_TIME_PER_EVENT,
-        TezConfiguration.YARN_ATS_MAX_POLLING_TIME_PER_EVENT_DEFAULT);
-    if (maxTimeToWaitOnShutdown < 0) {
-      waitForeverOnShutdown = true;
-    }
-    sessionDomainId = conf.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID);
-
-    LOG.info("Using " + atsHistoryACLManagerClassName + " to manage Timeline ACLs");
-    try {
-      historyACLPolicyManager = ReflectionUtils.createClazzInstance(
-          atsHistoryACLManagerClassName);
-      historyACLPolicyManager.setConf(conf);
-    } catch (TezUncheckedException e) {
-      LOG.warn("Could not instantiate object for " + atsHistoryACLManagerClassName
-          + ". ACLs cannot be enforced correctly for history data in Timeline", e);
-      if (!conf.getBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS,
-          TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS_DEFAULT)) {
-        throw e;
-      }
-      historyACLPolicyManager = null;
-    }
-
-  }
-
-  @Override
-  public void serviceStart() {
-    LOG.info("Starting ATSService");
-    timelineClient.start();
-
-    eventHandlingThread = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        List<DAGHistoryEvent> events = new LinkedList<DAGHistoryEvent>();
-        boolean interrupted = false;
-        while (!stopped.get() && !Thread.currentThread().isInterrupted()
-              && !interrupted) {
-
-          // Log the size of the event-queue every so often.
-          if (eventCounter != 0 && eventCounter % 1000 == 0) {
-            if (eventsProcessed != 0 && !events.isEmpty()) {
-              LOG.info("Event queue stats"
-                  + ", eventsProcessedSinceLastUpdate=" + eventsProcessed
-                  + ", eventQueueSize=" + eventQueue.size());
-            }
-            eventCounter = 0;
-            eventsProcessed = 0;
-          } else {
-            ++eventCounter;
-          }
-
-          synchronized (lock) {
-            try {
-              getEventBatch(events);
-            } catch (InterruptedException e) {
-              // Finish processing events and then return
-              interrupted = true;
-            }
-
-            if (events.isEmpty()) {
-              continue;
-            }
-
-            eventsProcessed += events.size();
-            try {
-              handleEvents(events);
-            } catch (Exception e) {
-              LOG.warn("Error handling events", e);
-            }
-          }
-        }
-      }
-    }, "HistoryEventHandlingThread");
-    eventHandlingThread.start();
-  }
-
-  @Override
-  public void serviceStop() {
-    LOG.info("Stopping ATSService"
-        + ", eventQueueBacklog=" + eventQueue.size());
-    stopped.set(true);
-    if (eventHandlingThread != null) {
-      eventHandlingThread.interrupt();
-    }
-    synchronized (lock) {
-      if (!eventQueue.isEmpty()) {
-        LOG.warn("ATSService being stopped"
-            + ", eventQueueBacklog=" + eventQueue.size()
-            + ", maxTimeLeftToFlush=" + maxTimeToWaitOnShutdown
-            + ", waitForever=" + waitForeverOnShutdown);
-        long startTime = appContext.getClock().getTime();
-        long endTime = startTime + maxTimeToWaitOnShutdown;
-        List<DAGHistoryEvent> events = new LinkedList<DAGHistoryEvent>();
-        while (waitForeverOnShutdown || (endTime >= appContext.getClock().getTime())) {
-          try {
-            getEventBatch(events);
-          } catch (InterruptedException e) {
-            LOG.info("ATSService interrupted while shutting down. Exiting."
-                  + " EventQueueBacklog=" + eventQueue.size());
-          }
-          if (events.isEmpty()) {
-            LOG.info("Event queue empty, stopping ATS Service");
-            break;
-          }
-          try {
-            handleEvents(events);
-          } catch (Exception e) {
-            LOG.warn("Error handling event", e);
-            break;
-          }
-        }
-      }
-    }
-    if (!eventQueue.isEmpty()) {
-      LOG.warn("Did not finish flushing eventQueue before stopping ATSService"
-          + ", eventQueueBacklog=" + eventQueue.size());
-    }
-    timelineClient.stop();
-  }
-
-  private void getEventBatch(List<DAGHistoryEvent> events) throws InterruptedException {
-    events.clear();
-    int counter = 0;
-    while (counter < maxEventsPerBatch) {
-      DAGHistoryEvent event = eventQueue.poll(maxPollingTimeMillis, TimeUnit.MILLISECONDS);
-      if (event == null) {
-        break;
-      }
-      if (!isValidEvent(event)) {
-        continue;
-      }
-      ++counter;
-      events.add(event);
-      if (event.getHistoryEvent().getEventType().equals(HistoryEventType.DAG_SUBMITTED)) {
-        // Special case this as it might be a large payload
-        break;
-      }
-    }
-  }
-
-
-  public void handle(DAGHistoryEvent event) {
-    eventQueue.add(event);
-  }
-
-  private boolean isValidEvent(DAGHistoryEvent event) {
-    HistoryEventType eventType = event.getHistoryEvent().getEventType();
-    TezDAGID dagId = event.getDagID();
-
-    if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
-      DAGSubmittedEvent dagSubmittedEvent =
-          (DAGSubmittedEvent) event.getHistoryEvent();
-      String dagName = dagSubmittedEvent.getDAGName();
-      if (dagName != null
-          && dagName.startsWith(
-          TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
-        // Skip recording pre-warm DAG events
-        skippedDAGs.add(dagId);
-        return false;
-      }
-      if (historyACLPolicyManager != null) {
-        String dagDomainId = dagSubmittedEvent.getConf().get(
-            TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID);
-        if (dagDomainId != null) {
-          dagDomainIdMap.put(dagId, dagDomainId);
-        }
-      }
-    }
-    if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
-      // Remove from set to keep size small
-      // No more events should be seen after this point.
-      if (skippedDAGs.remove(dagId)) {
-        return false;
-      }
-    }
-
-    if (dagId != null && skippedDAGs.contains(dagId)) {
-      // Skip pre-warm DAGs
-      return false;
-    }
-
-    return true;
-  }
-
-  private void handleEvents(List<DAGHistoryEvent> events) {
-    TimelineEntity[] entities = new TimelineEntity[events.size()];
-    for (int i = 0; i < events.size(); ++i) {
-      DAGHistoryEvent event = events.get(i);
-      String domainId = sessionDomainId;
-      TezDAGID dagId = event.getDagID();
-
-      if (historyACLPolicyManager != null && dagId != null) {
-        if (dagDomainIdMap.containsKey(dagId)) {
-          domainId = dagDomainIdMap.get(dagId);
-        }
-      }
-
-      entities[i] = HistoryEventTimelineConversion.convertToTimelineEntity(event.getHistoryEvent());
-      if (historyACLPolicyManager != null) {
-        if (domainId != null && !domainId.isEmpty()) {
-          historyACLPolicyManager.updateTimelineEntityDomain(entities[i], domainId);
-        }
-      }
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Sending event batch to Timeline, batchSize=" + events.size());
-    }
-    try {
-      TimelinePutResponse response =
-          timelineClient.putEntities(entities);
-      if (response != null
-        && !response.getErrors().isEmpty()) {
-        int count = response.getErrors().size();
-        for (int i = 0; i < count; ++i) {
-          TimelinePutError err = response.getErrors().get(i);
-          if (err.getErrorCode() != 0) {
-            LOG.warn("Could not post history event to ATS"
-                + ", atsPutError=" + err.getErrorCode()
-                + ", entityId=" + entities[i].getEntityId()
-                + ", eventType=" + events.get(i).getHistoryEvent().getEventType());
-          }
-        }
-      }
-      // Do nothing additional, ATS client library should handle throttling
-      // or auto-disable as needed
-    } catch (Exception e) {
-      LOG.warn("Could not handle history events", e);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/3629dbe8/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
deleted file mode 100644
index 8600776..0000000
--- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ /dev/null
@@ -1,631 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.logging.ats;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
-import org.apache.tez.common.ATSConstants;
-import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
-import org.apache.tez.dag.api.oldrecords.TaskState;
-import org.apache.tez.dag.history.HistoryEvent;
-import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.events.AMLaunchedEvent;
-import org.apache.tez.dag.history.events.AMStartedEvent;
-import org.apache.tez.dag.history.events.AppLaunchedEvent;
-import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
-import org.apache.tez.dag.history.events.ContainerStoppedEvent;
-import org.apache.tez.dag.history.events.DAGFinishedEvent;
-import org.apache.tez.dag.history.events.DAGInitializedEvent;
-import org.apache.tez.dag.history.events.DAGStartedEvent;
-import org.apache.tez.dag.history.events.DAGSubmittedEvent;
-import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
-import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
-import org.apache.tez.dag.history.events.TaskFinishedEvent;
-import org.apache.tez.dag.history.events.TaskStartedEvent;
-import org.apache.tez.dag.history.events.VertexFinishedEvent;
-import org.apache.tez.dag.history.events.VertexInitializedEvent;
-import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
-import org.apache.tez.dag.history.events.VertexStartedEvent;
-import org.apache.tez.dag.history.logging.EntityTypes;
-import org.apache.tez.dag.history.utils.DAGUtils;
-import org.apache.tez.dag.records.TezVertexID;
-
-public class HistoryEventTimelineConversion {
-
-  public static TimelineEntity convertToTimelineEntity(HistoryEvent historyEvent) {
-    if (!historyEvent.isHistoryEvent()) {
-      throw new UnsupportedOperationException("Invalid Event, does not support history"
-          + ", eventType=" + historyEvent.getEventType());
-    }
-    TimelineEntity timelineEntity;
-    switch (historyEvent.getEventType()) {
-      case APP_LAUNCHED:
-        timelineEntity = convertAppLaunchedEvent((AppLaunchedEvent) historyEvent);
-        break;
-      case AM_LAUNCHED:
-        timelineEntity = convertAMLaunchedEvent((AMLaunchedEvent) historyEvent);
-        break;
-      case AM_STARTED:
-        timelineEntity = convertAMStartedEvent((AMStartedEvent) historyEvent);
-        break;
-      case CONTAINER_LAUNCHED:
-        timelineEntity = convertContainerLaunchedEvent((ContainerLaunchedEvent) historyEvent);
-        break;
-      case CONTAINER_STOPPED:
-        timelineEntity = convertContainerStoppedEvent((ContainerStoppedEvent) historyEvent);
-        break;
-      case DAG_SUBMITTED:
-        timelineEntity = convertDAGSubmittedEvent((DAGSubmittedEvent) historyEvent);
-        break;
-      case DAG_INITIALIZED:
-        timelineEntity = convertDAGInitializedEvent((DAGInitializedEvent) historyEvent);
-        break;
-      case DAG_STARTED:
-        timelineEntity = convertDAGStartedEvent((DAGStartedEvent) historyEvent);
-        break;
-      case DAG_FINISHED:
-        timelineEntity = convertDAGFinishedEvent((DAGFinishedEvent) historyEvent);
-        break;
-      case VERTEX_INITIALIZED:
-        timelineEntity = convertVertexInitializedEvent((VertexInitializedEvent) historyEvent);
-        break;
-      case VERTEX_STARTED:
-        timelineEntity = convertVertexStartedEvent((VertexStartedEvent) historyEvent);
-        break;
-      case VERTEX_FINISHED:
-        timelineEntity = convertVertexFinishedEvent((VertexFinishedEvent) historyEvent);
-      break;
-      case TASK_STARTED:
-        timelineEntity = convertTaskStartedEvent((TaskStartedEvent) historyEvent);
-        break;
-      case TASK_FINISHED:
-        timelineEntity = convertTaskFinishedEvent((TaskFinishedEvent) historyEvent);
-        break;
-      case TASK_ATTEMPT_STARTED:
-        timelineEntity = convertTaskAttemptStartedEvent((TaskAttemptStartedEvent) historyEvent);
-        break;
-      case TASK_ATTEMPT_FINISHED:
-        timelineEntity = convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent);
-        break;
-      case VERTEX_PARALLELISM_UPDATED:
-        timelineEntity = convertVertexParallelismUpdatedEvent(
-            (VertexParallelismUpdatedEvent) historyEvent);
-        break;
-      case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
-      case VERTEX_COMMIT_STARTED:
-      case VERTEX_GROUP_COMMIT_STARTED:
-      case VERTEX_GROUP_COMMIT_FINISHED:
-      case DAG_COMMIT_STARTED:
-        throw new UnsupportedOperationException("Invalid Event, does not support history"
-            + ", eventType=" + historyEvent.getEventType());
-      default:
-        throw new UnsupportedOperationException("Unhandled Event"
-            + ", eventType=" + historyEvent.getEventType());
-    }
-    return timelineEntity;
-  }
-
-  private static TimelineEntity convertAppLaunchedEvent(AppLaunchedEvent event) {
-    TimelineEntity atsEntity = new TimelineEntity();
-    atsEntity.setEntityId("tez_"
-        + event.getApplicationId().toString());
-    atsEntity.setEntityType(EntityTypes.TEZ_APPLICATION.name());
-
-    atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ID,
-        event.getApplicationId().toString());
-    atsEntity.addRelatedEntity(ATSConstants.USER, event.getUser());
-
-    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
-
-    atsEntity.addOtherInfo(ATSConstants.CONFIG,
-        DAGUtils.convertConfigurationToATSMap(event.getConf()));
-
-    atsEntity.setStartTime(event.getLaunchTime());
-
-    if (event.getVersion() != null) {
-      atsEntity.addOtherInfo(ATSConstants.TEZ_VERSION,
-          DAGUtils.convertTezVersionToATSMap(event.getVersion()));
-    }
-
-    return atsEntity;
-  }
-
-  private static TimelineEntity convertAMLaunchedEvent(AMLaunchedEvent event) {
-    TimelineEntity atsEntity = new TimelineEntity();
-    atsEntity.setEntityId("tez_"
-        + event.getApplicationAttemptId().toString());
-    atsEntity.setEntityType(EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
-
-    atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ID,
-        event.getApplicationAttemptId().getApplicationId().toString());
-    atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ATTEMPT_ID,
-        event.getApplicationAttemptId().toString());
-    atsEntity.addRelatedEntity(ATSConstants.USER, event.getUser());
-
-    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
-    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
-        event.getApplicationAttemptId().getApplicationId().toString());
-
-    atsEntity.setStartTime(event.getLaunchTime());
-
-    TimelineEvent launchEvt = new TimelineEvent();
-    launchEvt.setEventType(HistoryEventType.AM_LAUNCHED.name());
-    launchEvt.setTimestamp(event.getLaunchTime());
-    atsEntity.addEvent(launchEvt);
-
-    atsEntity.addOtherInfo(ATSConstants.APP_SUBMIT_TIME, event.getAppSubmitTime());
-
-    return atsEntity;
-  }
-
-  private static TimelineEntity convertAMStartedEvent(AMStartedEvent event) {
-    TimelineEntity atsEntity = new TimelineEntity();
-    atsEntity.setEntityId("tez_"
-        + event.getApplicationAttemptId().toString());
-    atsEntity.setEntityType(EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
-
-    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
-    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
-        event.getApplicationAttemptId().getApplicationId().toString());
-
-    TimelineEvent startEvt = new TimelineEvent();
-    startEvt.setEventType(HistoryEventType.AM_STARTED.name());
-    startEvt.setTimestamp(event.getStartTime());
-    atsEntity.addEvent(startEvt);
-
-    return atsEntity;
-  }
-
-  private static TimelineEntity convertContainerLaunchedEvent(ContainerLaunchedEvent event) {
-    TimelineEntity atsEntity = new TimelineEntity();
-    atsEntity.setEntityId("tez_"
-        + event.getContainerId().toString());
-    atsEntity.setEntityType(EntityTypes.TEZ_CONTAINER_ID.name());
-
-    atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(),
-        "tez_" + event.getApplicationAttemptId().toString());
-    atsEntity.addRelatedEntity(ATSConstants.CONTAINER_ID,
-        event.getContainerId().toString());
-
-    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
-        event.getApplicationAttemptId().getApplicationId().toString());
-
-    atsEntity.setStartTime(event.getLaunchTime());
-
-    TimelineEvent launchEvt = new TimelineEvent();
-    launchEvt.setEventType(HistoryEventType.CONTAINER_LAUNCHED.name());
-    launchEvt.setTimestamp(event.getLaunchTime());
-    atsEntity.addEvent(launchEvt);
-
-    return atsEntity;
-  }
-
-  private static TimelineEntity convertContainerStoppedEvent(ContainerStoppedEvent event) {
-    TimelineEntity atsEntity = new TimelineEntity();
-    atsEntity.setEntityId("tez_"
-        + event.getContainerId().toString());
-    atsEntity.setEntityType(EntityTypes.TEZ_CONTAINER_ID.name());
-
-    // In case, a container is stopped in a different attempt
-    atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(),
-        "tez_" + event.getApplicationAttemptId().toString());
-
-    TimelineEvent stoppedEvt = new TimelineEvent();
-    stoppedEvt.setEventType(HistoryEventType.CONTAINER_STOPPED.name());
-    stoppedEvt.setTimestamp(event.getStoppedTime());
-    atsEntity.addEvent(stoppedEvt);
-
-    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
-        event.getApplicationAttemptId().getApplicationId().toString());
-    atsEntity.addPrimaryFilter(ATSConstants.EXIT_STATUS, event.getExitStatus());
-
-    atsEntity.addOtherInfo(ATSConstants.EXIT_STATUS, event.getExitStatus());
-    atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getStoppedTime());
-
-    return atsEntity;
-  }
-
-  private static TimelineEntity convertDAGFinishedEvent(DAGFinishedEvent event) {
-    TimelineEntity atsEntity = new TimelineEntity();
-    atsEntity.setEntityId(event.getDagID().toString());
-    atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name());
-
-    TimelineEvent finishEvt = new TimelineEvent();
-    finishEvt.setEventType(HistoryEventType.DAG_FINISHED.name());
-    finishEvt.setTimestamp(event.getFinishTime());
-    atsEntity.addEvent(finishEvt);
-
-    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
-    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
-        event.getDagID().getApplicationId().toString());
-    atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName());
-    atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name());
-
-    atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
-    atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
-    atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
-    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
-    atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
-    atsEntity.addOtherInfo(ATSConstants.COUNTERS,
-        DAGUtils.convertCountersToATSMap(event.getTezCounters()));
-
-    final Map<String, Integer> dagTaskStats = event.getDagTaskStats();
-    if (dagTaskStats != null) {
-      for(Entry<String, Integer> entry : dagTaskStats.entrySet()) {
-        atsEntity.addOtherInfo(entry.getKey(), entry.getValue().intValue());
-      }
-    }
-
-    return atsEntity;
-  }
-
-  private static TimelineEntity convertDAGInitializedEvent(DAGInitializedEvent event) {
-    TimelineEntity atsEntity = new TimelineEntity();
-    atsEntity.setEntityId(event.getDagID().toString());
-    atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name());
-
-    TimelineEvent initEvt = new TimelineEvent();
-    initEvt.setEventType(HistoryEventType.DAG_INITIALIZED.name());
-    initEvt.setTimestamp(event.getInitTime());
-    atsEntity.addEvent(initEvt);
-
-    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
-    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
-        event.getDagID().getApplicationId().toString());
-    atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName());
-
-    atsEntity.addOtherInfo(ATSConstants.INIT_TIME, event.getInitTime());
-
-    if (event.getVertexNameIDMap() != null) {
-      Map<String, String> nameIdStrMap = new TreeMap<String, String>();
-      for (Entry<String, TezVertexID> entry : event.getVertexNameIDMap().entrySet()) {
-        nameIdStrMap.put(entry.getKey(), entry.getValue().toString());
-      }
-      atsEntity.addOtherInfo(ATSConstants.VERTEX_NAME_ID_MAPPING, nameIdStrMap);
-    }
-
-    return atsEntity;
-  }
-
-  private static TimelineEntity convertDAGStartedEvent(DAGStartedEvent event) {
-    TimelineEntity atsEntity = new TimelineEntity();
-    atsEntity.setEntityId(event.getDagID().toString());
-    atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name());
-
-    TimelineEvent startEvt = new TimelineEvent();
-    startEvt.setEventType(HistoryEventType.DAG_STARTED.name());
-    startEvt.setTimestamp(event.getStartTime());
-    atsEntity.addEvent(startEvt);
-
-    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
-    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
-        event.getDagID().getApplicationId().toString());
-    atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName());
-
-    atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
-    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getDagState().toString());
-
-    return atsEntity;
-  }
-
-  private static TimelineEntity convertDAGSubmittedEvent(DAGSubmittedEvent event) {
-    TimelineEntity atsEntity = new TimelineEntity();
-    atsEntity.setEntityId(event.getDagID().toString());
-    atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name());
-
-    atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION.name(),
-        "tez_" + event.getApplicationAttemptId().getApplicationId().toString());
-    atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(),
-        "tez_" + event.getApplicationAttemptId().toString());
-    atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ID,
-        event.getApplicationAttemptId().getApplicationId().toString());
-    atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ATTEMPT_ID,
-        event.getApplicationAttemptId().toString());
-    atsEntity.addRelatedEntity(ATSConstants.USER, event.getUser());
-
-    TimelineEvent submitEvt = new TimelineEvent();
-    submitEvt.setEventType(HistoryEventType.DAG_SUBMITTED.name());
-    submitEvt.setTimestamp(event.getSubmitTime());
-    atsEntity.addEvent(submitEvt);
-
-    atsEntity.setStartTime(event.getSubmitTime());
-
-    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
-    atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDAGName());
-    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
-        event.getDagID().getApplicationId().toString());
-
-    try {
-      atsEntity.addOtherInfo(ATSConstants.DAG_PLAN,
-          DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan()));
-    } catch (IOException e) {
-      throw new TezUncheckedException(e);
-    }
-    atsEntity.addOtherInfo(ATSConstants.APPLICATION_ID,
-        event.getApplicationAttemptId().getApplicationId().toString());
-
-    return atsEntity;
-  }
-
-  private static TimelineEntity convertTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
-    TimelineEntity atsEntity = new TimelineEntity();
-    atsEntity.setEntityId(event.getTaskAttemptID().toString());
-    atsEntity.setEntityType(EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
-
-    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
-        event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getApplicationId().toString());
-    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
-        event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().toString());
-    atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(),
-        event.getTaskAttemptID().getTaskID().getVertexID().toString());
-    atsEntity.addPrimaryFilter(EntityTypes.TEZ_TASK_ID.name(),
-        event.getTaskAttemptID().getTaskID().toString());
-
-    TimelineEvent finishEvt = new TimelineEvent();
-    finishEvt.setEventType(HistoryEventType.TASK_ATTEMPT_FINISHED.name());
-    finishEvt.setTimestamp(event.getFinishTime());
-    atsEntity.addEvent(finishEvt);
-
-    atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name());
-
-    atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
-    atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
-    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
-    if (event.getTaskAttemptError() != null) {
-      atsEntity.addOtherInfo(ATSConstants.TASK_ATTEMPT_ERROR_ENUM, event.getTaskAttemptError().name());
-    }
-    atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
-    atsEntity.addOtherInfo(ATSConstants.COUNTERS,
-        DAGUtils.convertCountersToATSMap(event.getCounters()));
-
-    return atsEntity;
-  }
-
-  private static TimelineEntity convertTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {
-    TimelineEntity atsEntity = new TimelineEntity();
-    atsEntity.setEntityId(event.getTaskAttemptID().toString());
-    atsEntity.setEntityType(EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
-
-    atsEntity.setStartTime(event.getStartTime());
-
-    atsEntity.addRelatedEntity(ATSConstants.NODE_ID, event.getNodeId().toString());
-    atsEntity.addRelatedEntity(ATSConstants.CONTAINER_ID, event.getContainerId().toString());
-    atsEntity.addRelatedEntity(EntityTypes.TEZ_TASK_ID.name(),
-        event.getTaskAttemptID().getTaskID().toString());
-
-    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
-        event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getApplicationId().toString());
-    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
-        event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().toString());
-    atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(),
-        event.getTaskAttemptID().getTaskID().getVertexID().toString());
-    atsEntity.addPrimaryFilter(EntityTypes.TEZ_TASK_ID.name(),
-        event.getTaskAttemptID().getTaskID().toString());
-
-    TimelineEvent startEvt = new TimelineEvent();
-    startEvt.setEventType(HistoryEventType.TASK_ATTEMPT_STARTED.name());
-    startEvt.setTimestamp(event.getStartTime());
-    atsEntity.addEvent(startEvt);
-
-    atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
-    atsEntity.addOtherInfo(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl());
-    atsEntity.addOtherInfo(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl());
-    atsEntity.addOtherInfo(ATSConstants.NODE_ID, event.getNodeId().toString());
-    atsEntity.addOtherInfo(ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress());
-    atsEntity.addOtherInfo(ATSConstants.CONTAINER_ID, event.getContainerId().toString());
-    atsEntity.addOtherInfo(ATSConstants.STATUS, TaskAttemptState.RUNNING.name());
-
-    return atsEntity;
-  }
-
-  private static TimelineEntity convertTaskFinishedEvent(TaskFinishedEvent event) {
-    TimelineEntity atsEntity = new TimelineEntity();
-    atsEntity.setEntityId(event.getTaskID().toString());
-    atsEntity.setEntityType(EntityTypes.TEZ_TASK_ID.name());
-
-    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
-        event.getTaskID().getVertexID().getDAGId().getApplicationId().toString());
-    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
-        event.getTaskID().getVertexID().getDAGId().toString());
-    atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(),
-        event.getTaskID().getVertexID().toString());
-
-    TimelineEvent finishEvt = new TimelineEvent();
-    finishEvt.setEventType(HistoryEventType.TASK_FINISHED.name());
-    finishEvt.setTimestamp(event.getFinishTime());
-    atsEntity.addEvent(finishEvt);
-
-    atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name());
-
-    atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
-    atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
-    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
-    if (event.getSuccessfulAttemptID() != null) {
-      atsEntity.addOtherInfo(ATSConstants.SUCCESSFUL_ATTEMPT_ID,
-          event.getSuccessfulAttemptID().toString());
-    }
-
-    atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
-    atsEntity.addOtherInfo(ATSConstants.COUNTERS,
-        DAGUtils.convertCountersToATSMap(event.getTezCounters()));
-
-    return atsEntity;
-  }
-
-  private static TimelineEntity convertTaskStartedEvent(TaskStartedEvent event) {
-    TimelineEntity atsEntity = new TimelineEntity();
-    atsEntity.setEntityId(event.getTaskID().toString());
-    atsEntity.setEntityType(EntityTypes.TEZ_TASK_ID.name());
-
-    atsEntity.addRelatedEntity(EntityTypes.TEZ_VERTEX_ID.name(),
-        event.getTaskID().getVertexID().toString());
-
-    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
-        event.getTaskID().getVertexID().getDAGId().getApplicationId().toString());
-    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
-        event.getTaskID().getVertexID().getDAGId().toString());
-    atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(),
-        event.getTaskID().getVertexID().toString());
-
-    TimelineEvent startEvt = new TimelineEvent();
-    startEvt.setEventType(HistoryEventType.TASK_STARTED.name());
-    startEvt.setTimestamp(event.getStartTime());
-    atsEntity.addEvent(startEvt);
-
-    atsEntity.setStartTime(event.getStartTime());
-
-    atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
-    atsEntity.addOtherInfo(ATSConstants.SCHEDULED_TIME, event.getScheduledTime());
-    atsEntity.addOtherInfo(ATSConstants.STATUS, TaskState.SCHEDULED.name());
-
-    return atsEntity;
-  }
-
-  private static TimelineEntity convertVertexFinishedEvent(VertexFinishedEvent event) {
-    TimelineEntity atsEntity = new TimelineEntity();
-    atsEntity.setEntityId(event.getVertexID().toString());
-    atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name());
-
-    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
-        event.getVertexID().getDAGId().getApplicationId().toString());
-    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
-        event.getVertexID().getDAGId().toString());
-
-    TimelineEvent finishEvt = new TimelineEvent();
-    finishEvt.setEventType(HistoryEventType.VERTEX_FINISHED.name());
-    finishEvt.setTimestamp(event.getFinishTime());
-    atsEntity.addEvent(finishEvt);
-
-    atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name());
-
-    atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
-    atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
-    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
-
-    atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
-    atsEntity.addOtherInfo(ATSConstants.COUNTERS,
-        DAGUtils.convertCountersToATSMap(event.getTezCounters()));
-    atsEntity.addOtherInfo(ATSConstants.STATS,
-        DAGUtils.convertVertexStatsToATSMap(event.getVertexStats()));
-
-    final Map<String, Integer> vertexTaskStats = event.getVertexTaskStats();
-    if (vertexTaskStats != null) {
-      for(Entry<String, Integer> entry : vertexTaskStats.entrySet()) {
-        atsEntity.addOtherInfo(entry.getKey(), entry.getValue().intValue());
-      }
-    }
-
-    return atsEntity;
-  }
-
-  private static TimelineEntity convertVertexInitializedEvent(VertexInitializedEvent event) {
-    TimelineEntity atsEntity = new TimelineEntity();
-    atsEntity.setEntityId(event.getVertexID().toString());
-    atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name());
-
-    atsEntity.addRelatedEntity(EntityTypes.TEZ_DAG_ID.name(),
-        event.getVertexID().getDAGId().toString());
-
-    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
-        event.getVertexID().getDAGId().getApplicationId().toString());
-    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
-        event.getVertexID().getDAGId().toString());
-
-    TimelineEvent initEvt = new TimelineEvent();
-    initEvt.setEventType(HistoryEventType.VERTEX_INITIALIZED.name());
-    initEvt.setTimestamp(event.getInitedTime());
-    atsEntity.addEvent(initEvt);
-
-    atsEntity.setStartTime(event.getInitedTime());
-
-    atsEntity.addOtherInfo(ATSConstants.VERTEX_NAME, event.getVertexName());
-    atsEntity.addOtherInfo(ATSConstants.INIT_REQUESTED_TIME, event.getInitRequestedTime());
-    atsEntity.addOtherInfo(ATSConstants.INIT_TIME, event.getInitedTime());
-    atsEntity.addOtherInfo(ATSConstants.NUM_TASKS, event.getNumTasks());
-    atsEntity.addOtherInfo(ATSConstants.PROCESSOR_CLASS_NAME, event.getProcessorName());
-
-    return atsEntity;
-  }
-
-  private static TimelineEntity convertVertexStartedEvent(VertexStartedEvent event) {
-    TimelineEntity atsEntity = new TimelineEntity();
-    atsEntity.setEntityId(event.getVertexID().toString());
-    atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name());
-
-    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
-        event.getVertexID().getDAGId().getApplicationId().toString());
-    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
-        event.getVertexID().getDAGId().toString());
-
-    TimelineEvent startEvt = new TimelineEvent();
-    startEvt.setEventType(HistoryEventType.VERTEX_STARTED.name());
-    startEvt.setTimestamp(event.getStartTime());
-    atsEntity.addEvent(startEvt);
-
-    atsEntity.addOtherInfo(ATSConstants.START_REQUESTED_TIME, event.getStartRequestedTime());
-    atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
-    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getVertexState().toString());
-
-    return atsEntity;
-  }
-
-  private static TimelineEntity convertVertexParallelismUpdatedEvent(
-      VertexParallelismUpdatedEvent event) {
-    TimelineEntity atsEntity = new TimelineEntity();
-    atsEntity.setEntityId(event.getVertexID().toString());
-    atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name());
-
-    atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
-        event.getVertexID().getDAGId().getApplicationId().toString());
-    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
-        event.getVertexID().getDAGId().toString());
-
-    TimelineEvent updateEvt = new TimelineEvent();
-    updateEvt.setEventType(HistoryEventType.VERTEX_PARALLELISM_UPDATED.name());
-    updateEvt.setTimestamp(event.getUpdateTime());
-
-    Map<String,Object> eventInfo = new HashMap<String, Object>();
-    if (event.getSourceEdgeManagers() != null && !event.getSourceEdgeManagers().isEmpty()) {
-      Map<String, Object> updatedEdgeManagers = new HashMap<String, Object>();
-      for (Entry<String, EdgeManagerPluginDescriptor> entry :
-          event.getSourceEdgeManagers().entrySet()) {
-        updatedEdgeManagers.put(entry.getKey(),
-            DAGUtils.convertEdgeManagerPluginDescriptor(entry.getValue()));
-      }
-      eventInfo.put(ATSConstants.UPDATED_EDGE_MANAGERS, updatedEdgeManagers);
-    }
-    eventInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks());
-    eventInfo.put(ATSConstants.OLD_NUM_TASKS, event.getOldNumTasks());
-    updateEvt.setEventInfo(eventInfo);
-    atsEntity.addEvent(updateEvt);
-
-    atsEntity.addOtherInfo(ATSConstants.NUM_TASKS, event.getNumTasks());
-
-    return atsEntity;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/3629dbe8/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
deleted file mode 100644
index 18ec43e..0000000
--- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.logging.ats;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
-import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.history.DAGHistoryEvent;
-import org.apache.tez.dag.history.events.DAGStartedEvent;
-import org.apache.tez.dag.records.TezDAGID;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestATSHistoryLoggingService {
-
-  private static final Log LOG = LogFactory.getLog(TestATSHistoryLoggingService.class);
-
-  private ATSHistoryLoggingService atsHistoryLoggingService;
-  private AppContext appContext;
-  private Configuration conf;
-  private int atsInvokeCounter;
-  private int atsEntitiesCounter;
-  private SystemClock clock = new SystemClock();
-
-  @Before
-  public void setup() throws Exception {
-    appContext = mock(AppContext.class);
-    atsHistoryLoggingService = new ATSHistoryLoggingService();
-    atsHistoryLoggingService.setAppContext(appContext);
-    conf = new Configuration(false);
-    conf.setLong(TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS,
-        1000l);
-    conf.setInt(TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH, 2);
-    conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
-    atsInvokeCounter = 0;
-    atsEntitiesCounter = 0;
-    atsHistoryLoggingService.init(conf);
-    atsHistoryLoggingService.timelineClient = mock(TimelineClient.class);
-    atsHistoryLoggingService.start();
-    when(appContext.getClock()).thenReturn(clock);
-    when(appContext.getCurrentDAGID()).thenReturn(null);
-    when(atsHistoryLoggingService.timelineClient.putEntities(
-        Matchers.<TimelineEntity[]>anyVararg())).thenAnswer(
-        new Answer<Object>() {
-          @Override
-          public Object answer(InvocationOnMock invocation) throws Throwable {
-            ++atsInvokeCounter;
-            atsEntitiesCounter += invocation.getArguments().length;
-            try {
-              Thread.sleep(500l);
-            } catch (InterruptedException e) {
-              // do nothing
-            }
-            return null;
-          }
-        }
-    );
-  }
-
-  @After
-  public void teardown() {
-    atsHistoryLoggingService.stop();
-    atsHistoryLoggingService = null;
-  }
-
-  @Test(timeout=20000)
-  public void testATSHistoryLoggingServiceShutdown() {
-    TezDAGID tezDAGID = TezDAGID.getInstance(
-        ApplicationId.newInstance(100l, 1), 1);
-    DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID,
-        new DAGStartedEvent(tezDAGID, 1001l, "user1", "dagName1"));
-
-    for (int i = 0; i < 100; ++i) {
-      atsHistoryLoggingService.handle(historyEvent);
-    }
-
-    try {
-      Thread.sleep(2500l);
-    } catch (InterruptedException e) {
-      // Do nothing
-    }
-    atsHistoryLoggingService.stop();
-
-    LOG.info("ATS entitiesSent=" + atsEntitiesCounter
-        + ", timelineInvocations=" + atsInvokeCounter);
-
-    Assert.assertTrue(atsEntitiesCounter >= 4);
-    Assert.assertTrue(atsEntitiesCounter < 20);
-
-  }
-
-  @Test(timeout=20000)
-  public void testATSEventBatching() {
-    TezDAGID tezDAGID = TezDAGID.getInstance(
-        ApplicationId.newInstance(100l, 1), 1);
-    DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID,
-        new DAGStartedEvent(tezDAGID, 1001l, "user1", "dagName1"));
-
-    for (int i = 0; i < 100; ++i) {
-      atsHistoryLoggingService.handle(historyEvent);
-    }
-
-    try {
-      Thread.sleep(1000l);
-    } catch (InterruptedException e) {
-      // Do nothing
-    }
-    LOG.info("ATS entitiesSent=" + atsEntitiesCounter
-        + ", timelineInvocations=" + atsInvokeCounter);
-
-    Assert.assertTrue(atsEntitiesCounter > atsInvokeCounter);
-    Assert.assertEquals(atsEntitiesCounter/2, atsInvokeCounter);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/3629dbe8/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java
deleted file mode 100644
index 9c4f721..0000000
--- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.logging.ats;
-
-import java.io.IOException;
-import java.util.Random;
-
-import javax.ws.rs.core.MediaType;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.tez.client.TezClient;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.runtime.library.processor.SleepProcessor;
-import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig;
-import org.apache.tez.tests.MiniTezClusterWithTimeline;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
-
-public class TestATSHistoryWithMiniCluster {
-
-  private static final Log LOG = LogFactory.getLog(TestATSHistoryWithMiniCluster.class);
-
-  protected static MiniTezClusterWithTimeline mrrTezCluster = null;
-  protected static MiniDFSCluster dfsCluster = null;
-  private static String timelineAddress;
-  private Random random = new Random();
-
-  private static Configuration conf = new Configuration();
-  private static FileSystem remoteFs;
-
-  private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
-      + TestATSHistoryWithMiniCluster.class.getName() + "-tmpDir";
-
-  @BeforeClass
-  public static void setup() throws IOException {
-    try {
-      conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
-      dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null)
-          .build();
-      remoteFs = dfsCluster.getFileSystem();
-    } catch (IOException io) {
-      throw new RuntimeException("problem starting mini dfs cluster", io);
-    }
-
-    if (mrrTezCluster == null) {
-      try {
-        mrrTezCluster = new MiniTezClusterWithTimeline(TestATSHistoryWithMiniCluster.class.getName(),
-            1, 1, 1, true);
-        Configuration conf = new Configuration();
-        conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
-        conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
-        conf.setInt("yarn.nodemanager.delete.debug-delay-sec", 20000);
-        mrrTezCluster.init(conf);
-        mrrTezCluster.start();
-      } catch (Throwable e) {
-        LOG.info("Failed to start Mini Tez Cluster", e);
-      }
-    }
-    timelineAddress = mrrTezCluster.getConfig().get(
-        YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS);
-    if (timelineAddress != null) {
-      // Hack to handle bug in MiniYARNCluster handling of webapp address
-      timelineAddress = timelineAddress.replace("0.0.0.0", "localhost");
-    }
-  }
-
-  @AfterClass
-  public static void tearDown() throws InterruptedException {
-    LOG.info("Shutdown invoked");
-    Thread.sleep(10000);
-    if (mrrTezCluster != null) {
-      mrrTezCluster.stop();
-      mrrTezCluster = null;
-    }
-    if (dfsCluster != null) {
-      dfsCluster.shutdown();
-      dfsCluster = null;
-    }
-  }
-
-  // To be replaced after Timeline has java APIs for domains
-  private <K> K getTimelineData(String url, Class<K> clazz) {
-    Client client = new Client();
-    WebResource resource = client.resource(url);
-
-    ClientResponse response = resource.accept(MediaType.APPLICATION_JSON)
-        .get(ClientResponse.class);
-    Assert.assertEquals(200, response.getStatus());
-    Assert.assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
-
-    K entity = response.getEntity(clazz);
-    Assert.assertNotNull(entity);
-    return entity;
-  }
-
-  @Test (timeout=50000)
-  public void testSimpleAMACls() throws Exception {
-    TezClient tezSession = null;
-    ApplicationId applicationId;
-    try {
-      SleepProcessorConfig spConf = new SleepProcessorConfig(1);
-
-      DAG dag = DAG.create("TezSleepProcessor");
-      Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
-              SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
-          Resource.newInstance(256, 1));
-      dag.addVertex(vertex);
-
-      TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
-      tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
-      tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
-          ATSHistoryLoggingService.class.getName());
-      Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
-          .nextInt(100000))));
-      remoteFs.mkdirs(remoteStagingDir);
-      tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
-
-      tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
-      tezSession.start();
-
-      applicationId = tezSession.getAppMasterApplicationId();
-
-      DAGClient dagClient = tezSession.submitDAG(dag);
-
-      DAGStatus dagStatus = dagClient.getDAGStatus(null);
-      while (!dagStatus.isCompleted()) {
-        LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
-            + dagStatus.getState());
-        Thread.sleep(500l);
-        dagStatus = dagClient.getDAGStatus(null);
-      }
-      Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
-    } finally {
-      if (tezSession != null) {
-        tezSession.stop();
-      }
-    }
-
-//    verifyEntityExistence(applicationId);
-  }
-
-  @Test (timeout=50000)
-  public void testDAGACls() throws Exception {
-    TezClient tezSession = null;
-    ApplicationId applicationId;
-    try {
-      SleepProcessorConfig spConf = new SleepProcessorConfig(1);
-
-      DAG dag = DAG.create("TezSleepProcessor");
-      Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
-              SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
-          Resource.newInstance(256, 1));
-      dag.addVertex(vertex);
-
-      TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
-      tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
-      tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
-          ATSHistoryLoggingService.class.getName());
-      Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
-          .nextInt(100000))));
-      remoteFs.mkdirs(remoteStagingDir);
-      tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
-
-      tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
-      tezSession.start();
-
-      applicationId = tezSession.getAppMasterApplicationId();
-
-      DAGClient dagClient = tezSession.submitDAG(dag);
-
-      DAGStatus dagStatus = dagClient.getDAGStatus(null);
-      while (!dagStatus.isCompleted()) {
-        LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
-            + dagStatus.getState());
-        Thread.sleep(500l);
-        dagStatus = dagClient.getDAGStatus(null);
-      }
-      Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
-    } finally {
-      if (tezSession != null) {
-        tezSession.stop();
-      }
-    }
-//    verifyEntityExistence(applicationId);
-  }
-
-  private void verifyEntityExistence(ApplicationId applicationId) {
-    Assert.assertNotNull(timelineAddress);
-
-    String appUrl = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_APPLICATION/"
-        + "tez_" + applicationId.toString()  + "?fields=otherinfo";
-    LOG.info("Getting timeline entity for tez application: " + appUrl);
-    TimelineEntity appEntity = getTimelineData(appUrl, TimelineEntity.class);
-    Assert.assertNotNull(appEntity);
-
-    TezDAGID tezDAGID = TezDAGID.getInstance(applicationId, 1);
-    String dagUrl = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_DAG_ID/"
-        + tezDAGID.toString() + "?fields=otherinfo";
-    LOG.info("Getting timeline entity for tez dag: " + dagUrl);
-    TimelineEntity dagEntity = getTimelineData(dagUrl, TimelineEntity.class);
-    Assert.assertNotNull(dagEntity);
-  }
-
-
-}


Mime
View raw message