tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject [1/6] tez git commit: TEZ-2168. Fix application dependencies on mutually exclusive artifacts: tez-yarn-timeline-history and tez-yarn-timeline-history-with-acls. (hitesh)
Date Wed, 11 Mar 2015 04:42:38 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.6 200b42b7d -> 559099ea6


http://git-wip-us.apache.org/repos/asf/tez/blob/3629dbe8/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
new file mode 100644
index 0000000..6d713c5
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -0,0 +1,850 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.ats;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.tez.common.ATSConstants;
+import org.apache.tez.common.VersionInfo;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.impl.VertexStats;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.AMLaunchedEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.AppLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerStoppedEvent;
+import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.history.logging.EntityTypes;
+import org.apache.tez.dag.history.utils.DAGUtils;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.codehaus.jettison.json.JSONException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestHistoryEventTimelineConversion {
+
+  private ApplicationAttemptId applicationAttemptId;
+  private ApplicationId applicationId;
+  private String user = "user";
+  private Random random = new Random();
+  private TezDAGID tezDAGID;
+  private TezVertexID tezVertexID;
+  private TezTaskID tezTaskID;
+  private TezTaskAttemptID tezTaskAttemptID;
+  private DAGPlan dagPlan;
+  private ContainerId containerId;
+  private NodeId nodeId;
+
+  @Before
+  public void setup() {
+    applicationId = ApplicationId.newInstance(9999l, 1);
+    applicationAttemptId = ApplicationAttemptId.newInstance(applicationId, 1);
+    tezDAGID = TezDAGID.getInstance(applicationId, random.nextInt());
+    tezVertexID = TezVertexID.getInstance(tezDAGID, random.nextInt());
+    tezTaskID = TezTaskID.getInstance(tezVertexID, random.nextInt());
+    tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, random.nextInt());
+    dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build();
+    containerId = ContainerId.newInstance(applicationAttemptId, 111);
+    nodeId = NodeId.newInstance("node", 13435);
+  }
+
+  @Test(timeout = 5000)
+  public void testHandlerExists() throws JSONException {
+    for (HistoryEventType eventType : HistoryEventType.values()) {
+      HistoryEvent event = null;
+      switch (eventType) {
+        case APP_LAUNCHED:
+          event = new AppLaunchedEvent(applicationId, random.nextInt(), random.nextInt(),
+              user, new Configuration(false), null);
+          break;
+        case AM_LAUNCHED:
+          event = new AMLaunchedEvent(applicationAttemptId, random.nextInt(), random.nextInt(),
+              user);
+          break;
+        case AM_STARTED:
+          event = new AMStartedEvent(applicationAttemptId, random.nextInt(), user);
+          break;
+        case DAG_SUBMITTED:
+          event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId,
+              null, user, null);
+          break;
+        case DAG_INITIALIZED:
+          event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName(), null);
+          break;
+        case DAG_STARTED:
+          event = new DAGStartedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName());
+          break;
+        case DAG_FINISHED:
+          event = new DAGFinishedEvent(tezDAGID, random.nextInt(), random.nextInt(), DAGState.ERROR,
+              null, null, user, dagPlan.getName(), null);
+          break;
+        case VERTEX_INITIALIZED:
+          event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
+              random.nextInt(), "proc", null);
+          break;
+        case VERTEX_STARTED:
+          event = new VertexStartedEvent(tezVertexID, random.nextInt(), random.nextInt());
+          break;
+        case VERTEX_PARALLELISM_UPDATED:
+          event = new VertexParallelismUpdatedEvent(tezVertexID, 1, null, null, null, 1);
+          break;
+        case VERTEX_FINISHED:
+          event = new VertexFinishedEvent(tezVertexID, "v1", 1, random.nextInt(), random.nextInt(),
+              random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR,
+              null, null, null, null);
+          break;
+        case TASK_STARTED:
+          event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt());
+          break;
+        case TASK_FINISHED:
+          event = new TaskFinishedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt(),
+              tezTaskAttemptID, TaskState.FAILED, null, null);
+          break;
+        case TASK_ATTEMPT_STARTED:
+          event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId,
+              nodeId, null, null, "nodeHttpAddress");
+          break;
+        case TASK_ATTEMPT_FINISHED:
+          event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
+              random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, null, null);
+          break;
+        case CONTAINER_LAUNCHED:
+          event = new ContainerLaunchedEvent(containerId, random.nextInt(),
+              applicationAttemptId);
+          break;
+        case CONTAINER_STOPPED:
+          event = new ContainerStoppedEvent(containerId, random.nextInt(), -1, applicationAttemptId);
+          break;
+        case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+          event = new VertexRecoverableEventsGeneratedEvent();
+          break;
+        case DAG_COMMIT_STARTED:
+          event = new DAGCommitStartedEvent();
+          break;
+        case VERTEX_COMMIT_STARTED:
+          event = new VertexCommitStartedEvent();
+          break;
+        case VERTEX_GROUP_COMMIT_STARTED:
+          event = new VertexGroupCommitStartedEvent();
+          break;
+        case VERTEX_GROUP_COMMIT_FINISHED:
+          event = new VertexGroupCommitFinishedEvent();
+          break;
+        default:
+          Assert.fail("Unhandled event type " + eventType);
+      }
+      if (event == null || !event.isHistoryEvent()) {
+        continue;
+      }
+      HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    }
+  }
+
+  static class MockVersionInfo extends VersionInfo {
+
+    MockVersionInfo() {
+      super("component", "1.1.0", "rev1", "20120101", "git.apache.org");
+    }
+
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertAppLaunchedEvent() {
+    long launchTime = random.nextLong();
+    long submitTime = random.nextLong();
+    Configuration conf = new Configuration(false);
+    conf.set("foo", "bar");
+    conf.set("applicationId", "1234");
+
+    MockVersionInfo mockVersionInfo = new MockVersionInfo();
+    AppLaunchedEvent event = new AppLaunchedEvent(applicationId, launchTime,
+        submitTime, user, conf, mockVersionInfo);
+
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+
+    Assert.assertEquals(launchTime, timelineEntity.getStartTime().longValue());
+
+    Assert.assertEquals(EntityTypes.TEZ_APPLICATION.name(), timelineEntity.getEntityType());
+    Assert.assertEquals("tez_" + applicationId.toString(), timelineEntity.getEntityId());
+
+    Assert.assertEquals(2, timelineEntity.getRelatedEntities().size());
+    Assert.assertTrue(timelineEntity.getRelatedEntities().get(ATSConstants.USER).contains(user));
+    Assert.assertTrue(
+        timelineEntity.getRelatedEntities().get(ATSConstants.APPLICATION_ID).contains(
+            applicationId.toString()));
+
+    Assert.assertEquals(1, timelineEntity.getPrimaryFilters().size());
+    Assert.assertTrue(timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
+
+    Assert.assertEquals(2, timelineEntity.getOtherInfo().size());
+    Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.CONFIG));
+    Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.TEZ_VERSION));
+
+    Map<String, String> config =
+        (Map<String, String>)timelineEntity.getOtherInfo().get(ATSConstants.CONFIG);
+    Assert.assertEquals(conf.get("foo"), config.get("foo"));
+    Assert.assertEquals(conf.get("applicationId"), config.get("applicationId"));
+
+    Map<String, String> versionInfo =
+        (Map<String, String>)timelineEntity.getOtherInfo().get(ATSConstants.TEZ_VERSION);
+    Assert.assertEquals(mockVersionInfo.getVersion(),
+        versionInfo.get(ATSConstants.VERSION));
+    Assert.assertEquals(mockVersionInfo.getRevision(),
+        versionInfo.get(ATSConstants.REVISION));
+    Assert.assertEquals(mockVersionInfo.getBuildTime(),
+        versionInfo.get(ATSConstants.BUILD_TIME));
+
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertAMLaunchedEvent() {
+    long launchTime = random.nextLong();
+    long submitTime = random.nextLong();
+    AMLaunchedEvent event = new AMLaunchedEvent(applicationAttemptId, launchTime, submitTime, user);
+
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+
+    Assert.assertEquals("tez_" + applicationAttemptId.toString(), timelineEntity.getEntityId());
+    Assert.assertEquals(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), timelineEntity.getEntityType());
+
+    final Map<String, Set<String>> relatedEntities = timelineEntity.getRelatedEntities();
+    Assert.assertEquals(3, relatedEntities.size());
+    Assert.assertTrue(relatedEntities.get(ATSConstants.APPLICATION_ID)
+        .contains(applicationId.toString()));
+    Assert.assertTrue(relatedEntities.get(ATSConstants.APPLICATION_ATTEMPT_ID)
+        .contains(applicationAttemptId.toString()));
+    Assert.assertTrue(relatedEntities.get(ATSConstants.USER).contains(user));
+
+    final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters();
+    Assert.assertEquals(2, primaryFilters.size());
+    Assert.assertTrue(primaryFilters.get(ATSConstants.USER).contains(user));
+    Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID)
+        .contains(applicationId.toString()));
+
+    Assert.assertEquals(launchTime, timelineEntity.getStartTime().longValue());
+
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+    TimelineEvent evt = timelineEntity.getEvents().get(0);
+    Assert.assertEquals(HistoryEventType.AM_LAUNCHED.name(), evt.getEventType());
+    Assert.assertEquals(launchTime, evt.getTimestamp());
+
+    final Map<String, Object> otherInfo = timelineEntity.getOtherInfo();
+    Assert.assertEquals(1, otherInfo.size());
+    Assert.assertEquals(submitTime, otherInfo.get(ATSConstants.APP_SUBMIT_TIME));
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertContainerLaunchedEvent() {
+    long launchTime = random.nextLong();
+    ContainerLaunchedEvent event = new ContainerLaunchedEvent(containerId, launchTime,
+        applicationAttemptId);
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+
+    Assert.assertEquals(EntityTypes.TEZ_CONTAINER_ID.name(), timelineEntity.getEntityType());
+    Assert.assertEquals("tez_" + containerId.toString(), timelineEntity.getEntityId());
+
+    Assert.assertEquals(2, timelineEntity.getRelatedEntities().size());
+    Assert.assertTrue(timelineEntity.getRelatedEntities().get(ATSConstants.CONTAINER_ID).contains(
+        containerId.toString()));
+    Assert.assertTrue(
+        timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_APPLICATION_ATTEMPT.name()).contains(
+            "tez_" + applicationAttemptId.toString()));
+
+    Assert.assertEquals(1, timelineEntity.getPrimaryFilters().size());
+    Assert.assertTrue(timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
+        applicationAttemptId.getApplicationId().toString()));
+
+    Assert.assertEquals(launchTime, timelineEntity.getStartTime().longValue());
+
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+    Assert.assertEquals(HistoryEventType.CONTAINER_LAUNCHED.name(),
+        timelineEntity.getEvents().get(0).getEventType());
+    Assert.assertEquals(launchTime,
+        timelineEntity.getEvents().get(0).getTimestamp());
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertContainerStoppedEvent() {
+    long stopTime = random.nextLong();
+    int exitStatus = random.nextInt();
+    ContainerStoppedEvent event = new ContainerStoppedEvent(containerId, stopTime, exitStatus,
+        applicationAttemptId);
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+
+    Assert.assertEquals("tez_" + containerId.toString(), timelineEntity.getEntityId());
+    Assert.assertEquals(EntityTypes.TEZ_CONTAINER_ID.name(), timelineEntity.getEntityType());
+
+    final Map<String, Set<String>> relatedEntities = timelineEntity.getRelatedEntities();
+    Assert.assertEquals(1, relatedEntities.size());
+    Assert.assertTrue(relatedEntities.get(EntityTypes.TEZ_APPLICATION_ATTEMPT.name())
+        .contains("tez_" + applicationAttemptId.toString()));
+
+    final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters();
+    Assert.assertEquals(2, primaryFilters.size());
+    Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID)
+        .contains(applicationId.toString()));
+    Assert.assertTrue(primaryFilters.get(ATSConstants.EXIT_STATUS).contains(exitStatus));
+
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+    final TimelineEvent evt = timelineEntity.getEvents().get(0);
+    Assert.assertEquals(HistoryEventType.CONTAINER_STOPPED.name(), evt.getEventType());
+    Assert.assertEquals(stopTime, evt.getTimestamp());
+
+    final Map<String, Object> otherInfo = timelineEntity.getOtherInfo();
+    Assert.assertEquals(2, otherInfo.size());
+    Assert.assertEquals(exitStatus, otherInfo.get(ATSConstants.EXIT_STATUS));
+    Assert.assertEquals(stopTime, otherInfo.get(ATSConstants.FINISH_TIME));
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertDAGStartedEvent() {
+    long startTime = random.nextLong();
+    String dagName = "testDagName";
+    DAGStartedEvent event = new DAGStartedEvent(tezDAGID, startTime, user, dagName);
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+
+    Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
+    Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
+
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+    TimelineEvent evt = timelineEntity.getEvents().get(0);
+    Assert.assertEquals(HistoryEventType.DAG_STARTED.name(), evt.getEventType());
+    Assert.assertEquals(startTime, evt.getTimestamp());
+
+    final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters();
+    Assert.assertEquals(3, primaryFilters.size());
+    Assert.assertTrue(primaryFilters.get(ATSConstants.USER).contains(user));
+    Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID)
+        .contains(applicationId.toString()));
+    Assert.assertTrue(primaryFilters.get(ATSConstants.DAG_NAME).contains(dagName));
+
+    final Map<String, Object> otherInfo = timelineEntity.getOtherInfo();
+    Assert.assertEquals(2, otherInfo.size());
+    Assert.assertEquals(startTime, otherInfo.get(ATSConstants.START_TIME));
+    Assert.assertEquals(DAGState.RUNNING.name(), otherInfo.get(ATSConstants.STATUS));
+  }
+  @Test(timeout = 5000)
+  public void testConvertDAGSubmittedEvent() {
+    long submitTime = random.nextLong();
+
+    DAGSubmittedEvent event = new DAGSubmittedEvent(tezDAGID, submitTime, dagPlan,
+        applicationAttemptId, null, user, null);
+
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
+    Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
+
+    Assert.assertEquals(5, timelineEntity.getRelatedEntities().size());
+    Assert.assertTrue(
+        timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_APPLICATION.name()).contains(
+            "tez_" + applicationId.toString()));
+    Assert.assertTrue(
+        timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_APPLICATION_ATTEMPT.name()).contains(
+            "tez_" + applicationAttemptId.toString()));
+    Assert.assertTrue(
+        timelineEntity.getRelatedEntities().get(ATSConstants.APPLICATION_ATTEMPT_ID).contains(
+            applicationAttemptId.toString()));
+    Assert.assertTrue(
+        timelineEntity.getRelatedEntities().get(ATSConstants.APPLICATION_ID).contains(
+            applicationAttemptId.getApplicationId().toString()));
+    Assert.assertTrue(
+        timelineEntity.getRelatedEntities().get(ATSConstants.USER).contains(user));
+
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+    TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+    Assert.assertEquals(HistoryEventType.DAG_SUBMITTED.name(), timelineEvent.getEventType());
+    Assert.assertEquals(submitTime, timelineEvent.getTimestamp());
+
+    Assert.assertEquals(submitTime, timelineEntity.getStartTime().longValue());
+
+    Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size());
+
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains(
+            dagPlan.getName()));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
+            applicationAttemptId.getApplicationId().toString()));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
+
+    Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.DAG_PLAN));
+    Assert.assertEquals(applicationId.toString(),
+        timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID));
+
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertTaskAttemptFinishedEvent(){
+    String vertexName = "testVertex";
+    long startTime = random.nextLong();
+    long finishTime = startTime + 1234;
+    TaskAttemptState state = TaskAttemptState
+        .values()[random.nextInt(TaskAttemptState.values().length)];
+    TaskAttemptTerminationCause error = TaskAttemptTerminationCause
+        .values()[random.nextInt(TaskAttemptTerminationCause.values().length)];
+    String diagnostics = "random diagnostics message";
+    TezCounters counters = new TezCounters();
+
+    TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName,
+        startTime, finishTime, state, error, diagnostics, counters);
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId());
+    Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType());
+
+    final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters();
+    Assert.assertEquals(5, primaryFilters.size());
+    Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID)
+        .contains(applicationId.toString()));
+    Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_DAG_ID.name())
+        .contains(tezDAGID.toString()));
+    Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_VERTEX_ID.name())
+        .contains(tezVertexID.toString()));
+    Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_TASK_ID.name())
+        .contains(tezTaskID.toString()));
+    Assert.assertTrue(primaryFilters.get(ATSConstants.STATUS).contains(state.toString()));
+
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+    TimelineEvent evt = timelineEntity.getEvents().get(0);
+    Assert.assertEquals(HistoryEventType.TASK_ATTEMPT_FINISHED.name(), evt.getEventType());
+    Assert.assertEquals(finishTime, evt.getTimestamp());
+
+    final Map<String, Object> otherInfo = timelineEntity.getOtherInfo();
+    Assert.assertEquals(6, otherInfo.size());
+    Assert.assertEquals(finishTime, otherInfo.get(ATSConstants.FINISH_TIME));
+    Assert.assertEquals(finishTime - startTime, otherInfo.get(ATSConstants.TIME_TAKEN));
+    Assert.assertEquals(state.name(), otherInfo.get(ATSConstants.STATUS));
+    Assert.assertEquals(error.name(), otherInfo.get(ATSConstants.TASK_ATTEMPT_ERROR_ENUM));
+    Assert.assertEquals(diagnostics, otherInfo.get(ATSConstants.DIAGNOSTICS));
+    Assert.assertTrue(otherInfo.containsKey(ATSConstants.COUNTERS));
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertDAGInitializedEvent() {
+    long initTime = random.nextLong();
+
+    Map<String,TezVertexID> nameIdMap = new HashMap<String, TezVertexID>();
+    nameIdMap.put("foo", tezVertexID);
+
+    DAGInitializedEvent event = new DAGInitializedEvent(tezDAGID, initTime, "user", "dagName",
+        nameIdMap);
+
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
+    Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
+
+    Assert.assertEquals(0, timelineEntity.getRelatedEntities().size());
+
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+    TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+    Assert.assertEquals(HistoryEventType.DAG_INITIALIZED.name(), timelineEvent.getEventType());
+    Assert.assertEquals(initTime, timelineEvent.getTimestamp());
+
+    Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size());
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
+            applicationId.toString()));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains("dagName"));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
+
+    Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(
+        ATSConstants.VERTEX_NAME_ID_MAPPING));
+    Map<String, String> vIdMap = (Map<String, String>) timelineEntity.getOtherInfo().get(
+        ATSConstants.VERTEX_NAME_ID_MAPPING);
+    Assert.assertEquals(1, vIdMap.size());
+    Assert.assertNotNull(vIdMap.containsKey("foo"));
+    Assert.assertEquals(tezVertexID.toString(), vIdMap.get("foo"));
+
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertDAGFinishedEvent() {
+    long finishTime = random.nextLong();
+    long startTime = random.nextLong();
+    Map<String,Integer> taskStats = new HashMap<String, Integer>();
+    taskStats.put("FOO", 100);
+    taskStats.put("BAR", 200);
+
+    DAGFinishedEvent event = new DAGFinishedEvent(tezDAGID, startTime, finishTime, DAGState.ERROR,
+        "diagnostics", null, user, dagPlan.getName(), taskStats);
+
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
+    Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
+
+    Assert.assertEquals(0, timelineEntity.getRelatedEntities().size());
+
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+    TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+    Assert.assertEquals(HistoryEventType.DAG_FINISHED.name(), timelineEvent.getEventType());
+    Assert.assertEquals(finishTime, timelineEvent.getTimestamp());
+
+    Assert.assertEquals(4, timelineEntity.getPrimaryFilters().size());
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
+            applicationId.toString()));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains(dagPlan.getName()));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.STATUS).contains(
+            DAGState.ERROR.name()));
+
+    Assert.assertEquals(startTime,
+        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.START_TIME)).longValue());
+    Assert.assertEquals(finishTime,
+        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.FINISH_TIME)).longValue());
+    Assert.assertEquals(finishTime - startTime,
+        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.TIME_TAKEN)).longValue());
+    Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.COUNTERS));
+    Assert.assertEquals(DAGState.ERROR.name(),
+        timelineEntity.getOtherInfo().get(ATSConstants.STATUS));
+    Assert.assertEquals("diagnostics",
+        timelineEntity.getOtherInfo().get(ATSConstants.DIAGNOSTICS));
+
+    Assert.assertEquals(100,
+        ((Integer)timelineEntity.getOtherInfo().get("FOO")).intValue());
+    Assert.assertEquals(200,
+        ((Integer)timelineEntity.getOtherInfo().get("BAR")).intValue());
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertVertexInitializedEvent() {
+    long initRequestedTime = random.nextLong();
+    long initedTime = random.nextLong();
+    int numTasks = random.nextInt();
+    VertexInitializedEvent event = new VertexInitializedEvent(tezVertexID, "v1", initRequestedTime,
+        initedTime, numTasks, "proc", null);
+
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType());
+    Assert.assertEquals(tezVertexID.toString(), timelineEntity.getEntityId());
+
+    Assert.assertEquals(initedTime, timelineEntity.getStartTime().longValue());
+
+    Assert.assertEquals(1, timelineEntity.getRelatedEntities().size());
+    Assert.assertTrue(
+        timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_DAG_ID.name()).contains(
+            tezDAGID.toString()));
+
+    Assert.assertEquals(2, timelineEntity.getPrimaryFilters().size());
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
+            applicationId.toString()));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains(
+            tezDAGID.toString()));
+
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+    TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+    Assert.assertEquals(HistoryEventType.VERTEX_INITIALIZED.name(), timelineEvent.getEventType());
+    Assert.assertEquals(initedTime, timelineEvent.getTimestamp());
+
+    Assert.assertEquals("v1", timelineEntity.getOtherInfo().get(ATSConstants.VERTEX_NAME));
+    Assert.assertEquals("proc", timelineEntity.getOtherInfo().get(ATSConstants.PROCESSOR_CLASS_NAME));
+
+    Assert.assertEquals(initedTime,
+        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.INIT_TIME)).longValue());
+    Assert.assertEquals(initRequestedTime,
+        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.INIT_REQUESTED_TIME)).longValue());
+    Assert.assertEquals(initedTime,
+        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.INIT_TIME)).longValue());
+    Assert.assertEquals(numTasks,
+        ((Integer)timelineEntity.getOtherInfo().get(ATSConstants.NUM_TASKS)).intValue());
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertVertexFinishedEvent() {
+    long initRequestedTime = random.nextLong();
+    long initedTime = random.nextLong();
+    long startRequestedTime = random.nextLong();
+    long startTime = random.nextLong();
+    long finishTime = random.nextLong();
+    Map<String,Integer> taskStats = new HashMap<String, Integer>();
+    taskStats.put("FOO", 100);
+    taskStats.put("BAR", 200);
+    VertexStats vertexStats = new VertexStats();
+
+    VertexFinishedEvent event = new VertexFinishedEvent(tezVertexID, "v1", 1,initRequestedTime,
+        initedTime, startRequestedTime, startTime, finishTime, VertexState.ERROR,
+        "diagnostics", null, vertexStats, taskStats);
+
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType());
+    Assert.assertEquals(tezVertexID.toString(), timelineEntity.getEntityId());
+
+    Assert.assertEquals(0, timelineEntity.getRelatedEntities().size());
+
+    Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size());
+    Assert.assertTrue(timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID)
+        .contains(applicationId.toString()));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains(
+            tezDAGID.toString()));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.STATUS).contains(
+            VertexState.ERROR.name()));
+
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+    TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+    Assert.assertEquals(HistoryEventType.VERTEX_FINISHED.name(), timelineEvent.getEventType());
+    Assert.assertEquals(finishTime, timelineEvent.getTimestamp());
+
+    Assert.assertEquals(finishTime,
+        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.FINISH_TIME)).longValue());
+    Assert.assertEquals(finishTime - startTime,
+        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.TIME_TAKEN)).longValue());
+    Assert.assertEquals(VertexState.ERROR.name(),
+        timelineEntity.getOtherInfo().get(ATSConstants.STATUS));
+    Assert.assertEquals("diagnostics",
+        timelineEntity.getOtherInfo().get(ATSConstants.DIAGNOSTICS));
+
+    Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.STATS));
+
+    Assert.assertEquals(100,
+        ((Integer)timelineEntity.getOtherInfo().get("FOO")).intValue());
+    Assert.assertEquals(200,
+        ((Integer)timelineEntity.getOtherInfo().get("BAR")).intValue());
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertTaskStartedEvent() {
+    long scheduleTime = random.nextLong();
+    long startTime = random.nextLong();
+    TaskStartedEvent event = new TaskStartedEvent(tezTaskID, "v1", scheduleTime, startTime);
+
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    Assert.assertEquals(EntityTypes.TEZ_TASK_ID.name(), timelineEntity.getEntityType());
+    Assert.assertEquals(tezTaskID.toString(), timelineEntity.getEntityId());
+
+    Assert.assertEquals(startTime, timelineEntity.getStartTime().longValue());
+
+    Assert.assertEquals(1, timelineEntity.getRelatedEntities().size());
+    Assert.assertTrue(
+        timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_VERTEX_ID.name()).contains(
+            tezVertexID.toString()));
+
+    Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size());
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
+            applicationId.toString()));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains(
+            tezDAGID.toString()));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_VERTEX_ID.name()).contains(
+            tezVertexID.toString()));
+
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+    TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+    Assert.assertEquals(HistoryEventType.TASK_STARTED.name(), timelineEvent.getEventType());
+    Assert.assertEquals(startTime, timelineEvent.getTimestamp());
+
+    Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.SCHEDULED_TIME));
+    Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.START_TIME));
+
+    Assert.assertEquals(scheduleTime,
+        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.SCHEDULED_TIME)).longValue());
+    Assert.assertEquals(startTime,
+        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.START_TIME)).longValue());
+    Assert.assertTrue(TaskState.SCHEDULED.name()
+        .equals(timelineEntity.getOtherInfo().get(ATSConstants.STATUS)));
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertTaskAttemptStartedEvent() {
+    long startTime = random.nextLong();
+    TaskAttemptStartedEvent event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1",
+        startTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress");
+
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType());
+    Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId());
+
+    Assert.assertEquals(startTime, timelineEntity.getStartTime().longValue());
+
+    Assert.assertEquals(3, timelineEntity.getRelatedEntities().size());
+    Assert.assertTrue(
+        timelineEntity.getRelatedEntities().get(ATSConstants.NODE_ID).contains(nodeId.toString()));
+    Assert.assertTrue(
+        timelineEntity.getRelatedEntities().get(ATSConstants.CONTAINER_ID).contains(
+            containerId.toString()));
+    Assert.assertTrue(
+        timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_TASK_ID.name()).contains(
+            tezTaskID.toString()));
+
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+    TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+    Assert.assertEquals(HistoryEventType.TASK_ATTEMPT_STARTED.name(), timelineEvent.getEventType());
+    Assert.assertEquals(startTime, timelineEvent.getTimestamp());
+
+    Assert.assertEquals(4, timelineEntity.getPrimaryFilters().size());
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
+            applicationId.toString()));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains(
+            tezDAGID.toString()));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_VERTEX_ID.name()).contains(
+            tezVertexID.toString()));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_TASK_ID.name()).contains(
+            tezTaskID.toString()));
+
+    Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.START_TIME));
+    Assert.assertEquals("inProgressURL",
+        timelineEntity.getOtherInfo().get(ATSConstants.IN_PROGRESS_LOGS_URL));
+    Assert.assertEquals("logsURL",
+        timelineEntity.getOtherInfo().get(ATSConstants.COMPLETED_LOGS_URL));
+    Assert.assertEquals(nodeId.toString(),
+        timelineEntity.getOtherInfo().get(ATSConstants.NODE_ID));
+    Assert.assertEquals(containerId.toString(),
+        timelineEntity.getOtherInfo().get(ATSConstants.CONTAINER_ID));
+    Assert.assertEquals("nodeHttpAddress",
+        timelineEntity.getOtherInfo().get(ATSConstants.NODE_HTTP_ADDRESS));
+    Assert.assertTrue(TaskAttemptState.RUNNING.name()
+        .equals(timelineEntity.getOtherInfo().get(ATSConstants.STATUS)));
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertTaskFinishedEvent() {
+    String vertexName = "testVertexName";
+    long startTime = random.nextLong();
+    long finishTime = random.nextLong();
+    TaskState state = TaskState.values()[random.nextInt(TaskState.values().length)];
+    String diagnostics = "diagnostics message";
+    TezCounters counters = new TezCounters();
+
+    TaskFinishedEvent event = new TaskFinishedEvent(tezTaskID, vertexName, startTime, finishTime,
+        tezTaskAttemptID, state, diagnostics, counters);
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+
+    Assert.assertEquals(tezTaskID.toString(), timelineEntity.getEntityId());
+    Assert.assertEquals(EntityTypes.TEZ_TASK_ID.name(), timelineEntity.getEntityType());
+
+    final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters();
+    Assert.assertEquals(4, primaryFilters.size());
+    Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID)
+        .contains(applicationId.toString()));
+    Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_DAG_ID.name())
+        .contains(tezDAGID.toString()));
+    Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_VERTEX_ID.name())
+        .contains(tezVertexID.toString()));
+    Assert.assertTrue(primaryFilters.get(ATSConstants.STATUS).contains(state.name()));
+
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+    TimelineEvent evt = timelineEntity.getEvents().get(0);
+    Assert.assertEquals(HistoryEventType.TASK_FINISHED.name(), evt.getEventType());
+    Assert.assertEquals(finishTime, evt.getTimestamp());
+
+    final Map<String, Object> otherInfo = timelineEntity.getOtherInfo();
+    Assert.assertEquals(6, otherInfo.size());
+    Assert.assertEquals(finishTime, otherInfo.get(ATSConstants.FINISH_TIME));
+    Assert.assertEquals(finishTime - startTime, otherInfo.get(ATSConstants.TIME_TAKEN));
+    Assert.assertEquals(state.name(), otherInfo.get(ATSConstants.STATUS));
+    Assert.assertEquals(tezTaskAttemptID.toString(),
+        otherInfo.get(ATSConstants.SUCCESSFUL_ATTEMPT_ID));
+    Assert.assertEquals(diagnostics, otherInfo.get(ATSConstants.DIAGNOSTICS));
+    Assert.assertTrue(otherInfo.containsKey(ATSConstants.COUNTERS));
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertVertexParallelismUpdatedEvent() {
+    TezVertexID vId = tezVertexID;
+    Map<String, EdgeManagerPluginDescriptor> edgeMgrs =
+        new HashMap<String, EdgeManagerPluginDescriptor>();
+    edgeMgrs.put("a", EdgeManagerPluginDescriptor.create("a.class").setHistoryText("text"));
+    VertexParallelismUpdatedEvent event = new VertexParallelismUpdatedEvent(vId, 1, null,
+        edgeMgrs, null, 10);
+
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    Assert.assertEquals(ATSConstants.TEZ_VERTEX_ID, timelineEntity.getEntityType());
+    Assert.assertEquals(vId.toString(), timelineEntity.getEntityId());
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+
+    final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters();
+    Assert.assertEquals(2, primaryFilters.size());
+    Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID)
+        .contains(applicationId.toString()));
+    Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_DAG_ID.name())
+        .contains(tezDAGID.toString()));
+
+    TimelineEvent evt = timelineEntity.getEvents().get(0);
+    Assert.assertEquals(HistoryEventType.VERTEX_PARALLELISM_UPDATED.name(), evt.getEventType());
+    Assert.assertEquals(1, evt.getEventInfo().get(ATSConstants.NUM_TASKS));
+    Assert.assertEquals(10, evt.getEventInfo().get(ATSConstants.OLD_NUM_TASKS));
+    Assert.assertNotNull(evt.getEventInfo().get(ATSConstants.UPDATED_EDGE_MANAGERS));
+
+    Map<String, Object> updatedEdgeMgrs = (Map<String, Object>)
+        evt.getEventInfo().get(ATSConstants.UPDATED_EDGE_MANAGERS);
+    Assert.assertEquals(1, updatedEdgeMgrs.size());
+    Assert.assertTrue(updatedEdgeMgrs.containsKey("a"));
+    Map<String, Object> updatedEdgeMgr = (Map<String, Object>) updatedEdgeMgrs.get("a");
+
+    Assert.assertEquals("a.class", updatedEdgeMgr.get(DAGUtils.EDGE_MANAGER_CLASS_KEY));
+
+    Assert.assertEquals(1, timelineEntity.getOtherInfo().get(ATSConstants.NUM_TASKS));
+
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/3629dbe8/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests
deleted file mode 120000
index 784b54a..0000000
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests
+++ /dev/null
@@ -1 +0,0 @@
-../../../../../../../tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/tests
\ No newline at end of file
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java
new file mode 100644
index 0000000..d48948b
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java
@@ -0,0 +1,253 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.tests;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.ShuffleHandler;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.util.JarFinder;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.DAGAppMaster;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+
+/**
+ * Configures and starts the Tez-specific components in the YARN cluster.
+ *
+ * When using this mini cluster, the user is expected to
+ */
+public class MiniTezClusterWithTimeline extends MiniYARNCluster {
+
+  public static final String APPJAR = JarFinder.getJar(DAGAppMaster.class);
+
+  private static final Log LOG = LogFactory.getLog(MiniTezClusterWithTimeline.class);
+
+  private static final String YARN_CLUSTER_CONFIG = "yarn-site.xml";
+
+  private Path confFilePath;
+
+  public MiniTezClusterWithTimeline(String testName) {
+    this(testName, 1);
+  }
+
+  public MiniTezClusterWithTimeline(String testName, int noOfNMs) {
+    super(testName, noOfNMs, 4, 4);
+  }
+
+  public MiniTezClusterWithTimeline(String testName, int noOfNMs,
+      int numLocalDirs, int numLogDirs)  {
+    super(testName, noOfNMs, numLocalDirs, numLogDirs);
+  }
+
+  public MiniTezClusterWithTimeline(String testName, int noOfNMs,
+                        int numLocalDirs, int numLogDirs, boolean enableAHS)  {
+    super(testName, 1, noOfNMs, numLocalDirs, numLogDirs, enableAHS);
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_TEZ_FRAMEWORK_NAME);
+    // Use libs from cluster since no build is available
+    conf.setBoolean(TezConfiguration.TEZ_USE_CLUSTER_HADOOP_LIBS, true);
+    // blacklisting disabled to prevent scheduling issues
+    conf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
+    if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
+      conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
+          "apps_staging_dir" + Path.SEPARATOR).getAbsolutePath());
+    }
+    
+    if (conf.get(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC) == null) {
+      // nothing defined. set quick delete value
+      conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l);
+    }
+    
+    File appJarLocalFile = new File(MiniTezClusterWithTimeline.APPJAR);
+
+    if (!appJarLocalFile.exists()) {
+      String message = "TezAppJar " + MiniTezClusterWithTimeline.APPJAR
+          + " not found. Exiting.";
+      LOG.info(message);
+      throw new TezUncheckedException(message);
+    } else {
+      LOG.info("Using Tez AppJar: " + appJarLocalFile.getAbsolutePath());
+    }
+    
+    FileSystem fs = FileSystem.get(conf);
+    Path testRootDir = fs.makeQualified(new Path("target", getName() + "-tmpDir"));
+    Path appRemoteJar = new Path(testRootDir, "TezAppJar.jar");
+    // Copy AppJar and make it public.
+    Path appMasterJar = new Path(MiniTezClusterWithTimeline.APPJAR);
+    fs.copyFromLocalFile(appMasterJar, appRemoteJar);
+    fs.setPermission(appRemoteJar, new FsPermission("777"));
+
+    conf.set(TezConfiguration.TEZ_LIB_URIS, appRemoteJar.toUri().toString());
+    LOG.info("Set TEZ-LIB-URI to: " + conf.get(TezConfiguration.TEZ_LIB_URIS));
+
+    // VMEM monitoring disabled, PMEM monitoring enabled.
+    conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
+    conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
+
+    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "000");
+
+    try {
+      Path stagingPath = FileContext.getFileContext(conf).makeQualified(
+          new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
+      /*
+       * Re-configure the staging path on Windows if the file system is localFs.
+       * We need to use a absolute path that contains the drive letter. The unit
+       * test could run on a different drive than the AM. We can run into the
+       * issue that job files are localized to the drive where the test runs on,
+       * while the AM starts on a different drive and fails to find the job
+       * metafiles. Using absolute path can avoid this ambiguity.
+       */
+      if (Path.WINDOWS) {
+        if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) {
+          conf.set(MRJobConfig.MR_AM_STAGING_DIR,
+              new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR))
+                  .getAbsolutePath());
+        }
+      }
+      FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
+      if (fc.util().exists(stagingPath)) {
+        LOG.info(stagingPath + " exists! deleting...");
+        fc.delete(stagingPath, true);
+      }
+      LOG.info("mkdir: " + stagingPath);
+      fc.mkdir(stagingPath, null, true);
+
+      //mkdir done directory as well
+      String doneDir =
+          JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
+      Path doneDirPath = fc.makeQualified(new Path(doneDir));
+      fc.mkdir(doneDirPath, null, true);
+    } catch (IOException e) {
+      throw new TezUncheckedException("Could not create staging directory. ", e);
+    }
+    conf.set(MRConfig.MASTER_ADDRESS, "test");
+
+    //configure the shuffle service in NM
+    conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
+        new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
+    conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
+        ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
+        Service.class);
+
+    // Non-standard shuffle port
+    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+
+    conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
+        DefaultContainerExecutor.class, ContainerExecutor.class);
+
+    // TestMRJobs is for testing non-uberized operation only; see TestUberAM
+    // for corresponding uberized tests.
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    LOG.info("Starting MiniTezClusterWithTimeline");
+    super.serviceStart();
+    File workDir = super.getTestWorkDir();
+    Configuration conf = super.getConfig();
+
+    confFilePath = new Path(workDir.getAbsolutePath(), YARN_CLUSTER_CONFIG);
+    File confFile = new File(confFilePath.toString());
+    try {
+      confFile.createNewFile();
+      conf.writeXml(new FileOutputStream(confFile));
+      confFile.deleteOnExit();
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+    confFilePath = new Path(confFile.getAbsolutePath());
+    conf.setStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+        workDir.getAbsolutePath(), System.getProperty("java.class.path"));
+    LOG.info("Setting yarn-site.xml via YARN-APP-CP at: "
+        + conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH));
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    waitForAppsToFinish();
+    super.serviceStop();
+  }
+
+  private void waitForAppsToFinish() {
+    YarnClient yarnClient = YarnClient.createYarnClient(); 
+    yarnClient.init(getConfig());
+    yarnClient.start();
+    try {
+      while(true) {
+        List<ApplicationReport> appReports = yarnClient.getApplications();
+        Collection<ApplicationReport> unCompletedApps = Collections2.filter(appReports, new Predicate<ApplicationReport>(){
+          @Override
+          public boolean apply(ApplicationReport appReport) {
+            return EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING,
+            YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING)
+            .contains(appReport.getYarnApplicationState());
+          }
+        });
+        if (unCompletedApps.size()==0){
+          break;
+        }
+        LOG.info("wait for applications to finish in MiniTezClusterWithTimeline");
+        Thread.sleep(1000);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      yarnClient.stop();
+    }
+  }
+  
+  public Path getConfigFilePath() {
+    return confFilePath;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/3629dbe8/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java
new file mode 100644
index 0000000..d48948b
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java
@@ -0,0 +1,253 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.tests;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.ShuffleHandler;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.util.JarFinder;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.DAGAppMaster;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+
+/**
+ * Configures and starts the Tez-specific components in the YARN cluster.
+ *
+ * When using this mini cluster, the user is expected to
+ */
+public class MiniTezClusterWithTimeline extends MiniYARNCluster {
+
+  public static final String APPJAR = JarFinder.getJar(DAGAppMaster.class);
+
+  private static final Log LOG = LogFactory.getLog(MiniTezClusterWithTimeline.class);
+
+  private static final String YARN_CLUSTER_CONFIG = "yarn-site.xml";
+
+  private Path confFilePath;
+
+  public MiniTezClusterWithTimeline(String testName) {
+    this(testName, 1);
+  }
+
+  public MiniTezClusterWithTimeline(String testName, int noOfNMs) {
+    super(testName, noOfNMs, 4, 4);
+  }
+
+  public MiniTezClusterWithTimeline(String testName, int noOfNMs,
+      int numLocalDirs, int numLogDirs)  {
+    super(testName, noOfNMs, numLocalDirs, numLogDirs);
+  }
+
+  public MiniTezClusterWithTimeline(String testName, int noOfNMs,
+                        int numLocalDirs, int numLogDirs, boolean enableAHS)  {
+    super(testName, 1, noOfNMs, numLocalDirs, numLogDirs, enableAHS);
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_TEZ_FRAMEWORK_NAME);
+    // Use libs from cluster since no build is available
+    conf.setBoolean(TezConfiguration.TEZ_USE_CLUSTER_HADOOP_LIBS, true);
+    // blacklisting disabled to prevent scheduling issues
+    conf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
+    if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
+      conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
+          "apps_staging_dir" + Path.SEPARATOR).getAbsolutePath());
+    }
+    
+    if (conf.get(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC) == null) {
+      // nothing defined. set quick delete value
+      conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l);
+    }
+    
+    File appJarLocalFile = new File(MiniTezClusterWithTimeline.APPJAR);
+
+    if (!appJarLocalFile.exists()) {
+      String message = "TezAppJar " + MiniTezClusterWithTimeline.APPJAR
+          + " not found. Exiting.";
+      LOG.info(message);
+      throw new TezUncheckedException(message);
+    } else {
+      LOG.info("Using Tez AppJar: " + appJarLocalFile.getAbsolutePath());
+    }
+    
+    FileSystem fs = FileSystem.get(conf);
+    Path testRootDir = fs.makeQualified(new Path("target", getName() + "-tmpDir"));
+    Path appRemoteJar = new Path(testRootDir, "TezAppJar.jar");
+    // Copy AppJar and make it public.
+    Path appMasterJar = new Path(MiniTezClusterWithTimeline.APPJAR);
+    fs.copyFromLocalFile(appMasterJar, appRemoteJar);
+    fs.setPermission(appRemoteJar, new FsPermission("777"));
+
+    conf.set(TezConfiguration.TEZ_LIB_URIS, appRemoteJar.toUri().toString());
+    LOG.info("Set TEZ-LIB-URI to: " + conf.get(TezConfiguration.TEZ_LIB_URIS));
+
+    // VMEM monitoring disabled, PMEM monitoring enabled.
+    conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
+    conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
+
+    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "000");
+
+    try {
+      Path stagingPath = FileContext.getFileContext(conf).makeQualified(
+          new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
+      /*
+       * Re-configure the staging path on Windows if the file system is localFs.
+       * We need to use a absolute path that contains the drive letter. The unit
+       * test could run on a different drive than the AM. We can run into the
+       * issue that job files are localized to the drive where the test runs on,
+       * while the AM starts on a different drive and fails to find the job
+       * metafiles. Using absolute path can avoid this ambiguity.
+       */
+      if (Path.WINDOWS) {
+        if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) {
+          conf.set(MRJobConfig.MR_AM_STAGING_DIR,
+              new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR))
+                  .getAbsolutePath());
+        }
+      }
+      FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
+      if (fc.util().exists(stagingPath)) {
+        LOG.info(stagingPath + " exists! deleting...");
+        fc.delete(stagingPath, true);
+      }
+      LOG.info("mkdir: " + stagingPath);
+      fc.mkdir(stagingPath, null, true);
+
+      //mkdir done directory as well
+      String doneDir =
+          JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
+      Path doneDirPath = fc.makeQualified(new Path(doneDir));
+      fc.mkdir(doneDirPath, null, true);
+    } catch (IOException e) {
+      throw new TezUncheckedException("Could not create staging directory. ", e);
+    }
+    conf.set(MRConfig.MASTER_ADDRESS, "test");
+
+    //configure the shuffle service in NM
+    conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
+        new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
+    conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
+        ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
+        Service.class);
+
+    // Non-standard shuffle port
+    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+
+    conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
+        DefaultContainerExecutor.class, ContainerExecutor.class);
+
+    // TestMRJobs is for testing non-uberized operation only; see TestUberAM
+    // for corresponding uberized tests.
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    LOG.info("Starting MiniTezClusterWithTimeline");
+    super.serviceStart();
+    File workDir = super.getTestWorkDir();
+    Configuration conf = super.getConfig();
+
+    confFilePath = new Path(workDir.getAbsolutePath(), YARN_CLUSTER_CONFIG);
+    File confFile = new File(confFilePath.toString());
+    try {
+      confFile.createNewFile();
+      conf.writeXml(new FileOutputStream(confFile));
+      confFile.deleteOnExit();
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+    confFilePath = new Path(confFile.getAbsolutePath());
+    conf.setStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+        workDir.getAbsolutePath(), System.getProperty("java.class.path"));
+    LOG.info("Setting yarn-site.xml via YARN-APP-CP at: "
+        + conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH));
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    waitForAppsToFinish();
+    super.serviceStop();
+  }
+
+  private void waitForAppsToFinish() {
+    YarnClient yarnClient = YarnClient.createYarnClient(); 
+    yarnClient.init(getConfig());
+    yarnClient.start();
+    try {
+      while(true) {
+        List<ApplicationReport> appReports = yarnClient.getApplications();
+        Collection<ApplicationReport> unCompletedApps = Collections2.filter(appReports, new Predicate<ApplicationReport>(){
+          @Override
+          public boolean apply(ApplicationReport appReport) {
+            return EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING,
+            YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING)
+            .contains(appReport.getYarnApplicationState());
+          }
+        });
+        if (unCompletedApps.size()==0){
+          break;
+        }
+        LOG.info("wait for applications to finish in MiniTezClusterWithTimeline");
+        Thread.sleep(1000);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      yarnClient.stop();
+    }
+  }
+  
+  public Path getConfigFilePath() {
+    return confFilePath;
+  }
+
+}


Mime
View raw message