tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject [1/3] TEZ-1066. Generate events to integrate with YARN timeline server. (hitesh)
Date Wed, 21 May 2014 22:13:13 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master 6e07fc7e7 -> bc6579614


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
new file mode 100644
index 0000000..8ab9c91
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.ats;
+
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.AMLaunchedEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerStoppedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.history.logging.EntityTypes;
+import org.apache.tez.dag.history.utils.ATSConstants;
+import org.apache.tez.dag.history.utils.DAGUtils;
+
+public class HistoryEventTimelineConversion {
+
+  public static TimelineEntity convertToTimelineEntity(HistoryEvent historyEvent) {
+    if (!historyEvent.isHistoryEvent()) {
+      throw new UnsupportedOperationException("Invalid Event, does not support history"
+          + ", eventType=" + historyEvent.getEventType());
+    }
+    TimelineEntity timelineEntity = null;
+    switch (historyEvent.getEventType()) {
+      case AM_LAUNCHED:
+        timelineEntity = convertAMLaunchedEvent((AMLaunchedEvent) historyEvent);
+        break;
+      case AM_STARTED:
+        timelineEntity = convertAMStartedEvent((AMStartedEvent) historyEvent);
+        break;
+      case CONTAINER_LAUNCHED:
+        timelineEntity = convertContainerLaunchedEvent((ContainerLaunchedEvent) historyEvent);
+        break;
+      case CONTAINER_STOPPED:
+        timelineEntity = convertContainerStoppedEvent((ContainerStoppedEvent) historyEvent);
+        break;
+      case DAG_SUBMITTED:
+        timelineEntity = convertDAGSubmittedEvent((DAGSubmittedEvent) historyEvent);
+        break;
+      case DAG_INITIALIZED:
+        timelineEntity = convertDAGInitializedEvent((DAGInitializedEvent) historyEvent);
+        break;
+      case DAG_STARTED:
+        timelineEntity = convertDAGStartedEvent((DAGStartedEvent) historyEvent);
+        break;
+      case DAG_FINISHED:
+        timelineEntity = convertDAGFinishedEvent((DAGFinishedEvent) historyEvent);
+        break;
+      case VERTEX_INITIALIZED:
+        timelineEntity = convertVertexInitializedEvent((VertexInitializedEvent) historyEvent);
+        break;
+      case VERTEX_STARTED:
+        timelineEntity = convertVertexStartedEvent((VertexStartedEvent) historyEvent);
+        break;
+      case VERTEX_FINISHED:
+        timelineEntity = convertVertexFinishedEvent((VertexFinishedEvent) historyEvent);
+      break;
+      case TASK_STARTED:
+        timelineEntity = convertTaskStartedEvent((TaskStartedEvent) historyEvent);
+        break;
+      case TASK_FINISHED:
+        timelineEntity = convertTaskFinishedEvent((TaskFinishedEvent) historyEvent);
+        break;
+      case TASK_ATTEMPT_STARTED:
+        timelineEntity = convertTaskAttemptStartedEvent((TaskAttemptStartedEvent) historyEvent);
+        break;
+      case TASK_ATTEMPT_FINISHED:
+        timelineEntity = convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent);
+        break;
+      case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+      case VERTEX_COMMIT_STARTED:
+      case VERTEX_GROUP_COMMIT_STARTED:
+      case VERTEX_GROUP_COMMIT_FINISHED:
+      case VERTEX_PARALLELISM_UPDATED:
+      case DAG_COMMIT_STARTED:
+        throw new UnsupportedOperationException("Invalid Event, does not support history"
+            + ", eventType=" + historyEvent.getEventType());
+      default:
+        throw new UnsupportedOperationException("Unhandled Event"
+            + ", eventType=" + historyEvent.getEventType());
+    }
+    return timelineEntity;
+  }
+
+  private static TimelineEntity convertAMLaunchedEvent(AMLaunchedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId("tez_"
+        + event.getApplicationAttemptId().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+    atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ID,
+        event.getApplicationAttemptId().getApplicationId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ATTEMPT_ID,
+        event.getApplicationAttemptId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.USER, event.getUser());
+
+    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+
+    atsEntity.setStartTime(event.getLaunchTime());
+
+    TimelineEvent launchEvt = new TimelineEvent();
+    launchEvt.setEventType(HistoryEventType.AM_LAUNCHED.name());
+    launchEvt.setTimestamp(event.getLaunchTime());
+    atsEntity.addEvent(launchEvt);
+
+    atsEntity.addOtherInfo(ATSConstants.APP_SUBMIT_TIME, event.getAppSubmitTime());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertAMStartedEvent(AMStartedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId("tez_"
+        + event.getApplicationAttemptId().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+
+    TimelineEvent startEvt = new TimelineEvent();
+    startEvt.setEventType(HistoryEventType.AM_STARTED.name());
+    startEvt.setTimestamp(event.getStartTime());
+    atsEntity.addEvent(startEvt);
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertContainerLaunchedEvent(ContainerLaunchedEvent event)
{
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId("tez_"
+        + event.getContainerId().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_CONTAINER_ID.name());
+
+    atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(),
+        "tez_" + event.getApplicationAttemptId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.CONTAINER_ID,
+        event.getContainerId().toString());
+
+    atsEntity.setStartTime(event.getLaunchTime());
+
+    TimelineEvent launchEvt = new TimelineEvent();
+    launchEvt.setEventType(HistoryEventType.CONTAINER_LAUNCHED.name());
+    launchEvt.setTimestamp(event.getLaunchTime());
+    atsEntity.addEvent(launchEvt);
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertContainerStoppedEvent(ContainerStoppedEvent event)
{
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId("tez_"
+        + event.getContainerId().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_CONTAINER_ID.name());
+
+    // In case, a container is stopped in a different attempt
+    atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(),
+        "tez_" + event.getApplicationAttemptId().toString());
+
+    TimelineEvent stoppedEvt = new TimelineEvent();
+    stoppedEvt.setEventType(HistoryEventType.CONTAINER_STOPPED.name());
+    stoppedEvt.setTimestamp(event.getStoppedTime());
+    atsEntity.addEvent(stoppedEvt);
+
+    atsEntity.addOtherInfo(ATSConstants.EXIT_STATUS, event.getExitStatus());
+    atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getStoppedTime());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertDAGFinishedEvent(DAGFinishedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getDagID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name());
+
+    TimelineEvent finishEvt = new TimelineEvent();
+    finishEvt.setEventType(HistoryEventType.DAG_FINISHED.name());
+    finishEvt.setTimestamp(event.getFinishTime());
+    atsEntity.addEvent(finishEvt);
+
+    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+    atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName());
+
+    atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+    atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
+    atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
+    atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+    atsEntity.addOtherInfo(ATSConstants.COUNTERS,
+        DAGUtils.convertCountersToATSMap(event.getTezCounters()));
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertDAGInitializedEvent(DAGInitializedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getDagID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name());
+
+    TimelineEvent finishEvt = new TimelineEvent();
+    finishEvt.setEventType(HistoryEventType.DAG_INITIALIZED.name());
+    finishEvt.setTimestamp(event.getInitTime());
+    atsEntity.addEvent(finishEvt);
+
+    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+    atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName());
+
+    atsEntity.addOtherInfo(ATSConstants.INIT_TIME, event.getInitTime());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertDAGStartedEvent(DAGStartedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getDagID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name());
+
+    TimelineEvent startEvt = new TimelineEvent();
+    startEvt.setEventType(HistoryEventType.DAG_STARTED.name());
+    startEvt.setTimestamp(event.getStartTime());
+    atsEntity.addEvent(startEvt);
+
+    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+    atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName());
+
+    atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertDAGSubmittedEvent(DAGSubmittedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getDagID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name());
+
+    atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(),
+        "tez_" + event.getApplicationAttemptId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ID,
+        event.getApplicationAttemptId().getApplicationId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ATTEMPT_ID,
+        event.getApplicationAttemptId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.USER, event.getUser());
+
+    TimelineEvent submitEvt = new TimelineEvent();
+    submitEvt.setEventType(HistoryEventType.DAG_SUBMITTED.name());
+    submitEvt.setTimestamp(event.getSubmitTime());
+    atsEntity.addEvent(submitEvt);
+
+    atsEntity.setStartTime(event.getSubmitTime());
+
+    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+    atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDAGName());
+
+    atsEntity.addOtherInfo(ATSConstants.DAG_PLAN,
+        DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan()));
+    atsEntity.addOtherInfo(ATSConstants.APPLICATION_ID,
+        event.getApplicationAttemptId().getApplicationId().toString());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertTaskAttemptFinishedEvent(TaskAttemptFinishedEvent
event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getTaskAttemptID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
+
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(),
+        event.getTaskAttemptID().getTaskID().getVertexID().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_TASK_ID.name(),
+        event.getTaskAttemptID().getTaskID().toString());
+
+    TimelineEvent finishEvt = new TimelineEvent();
+    finishEvt.setEventType(HistoryEventType.TASK_ATTEMPT_FINISHED.name());
+    finishEvt.setTimestamp(event.getFinishTime());
+    atsEntity.addEvent(finishEvt);
+
+    atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
+    atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
+    atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+    atsEntity.addOtherInfo(ATSConstants.COUNTERS,
+        DAGUtils.convertCountersToATSMap(event.getCounters()));
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertTaskAttemptStartedEvent(TaskAttemptStartedEvent event)
{
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getTaskAttemptID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
+
+    atsEntity.setStartTime(event.getStartTime());
+
+    atsEntity.addRelatedEntity(ATSConstants.NODE_ID, event.getNodeId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.CONTAINER_ID, event.getContainerId().toString());
+    atsEntity.addRelatedEntity(EntityTypes.TEZ_TASK_ID.name(),
+        event.getTaskAttemptID().getTaskID().toString());
+
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(),
+        event.getTaskAttemptID().getTaskID().getVertexID().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_TASK_ID.name(),
+        event.getTaskAttemptID().getTaskID().toString());
+
+    TimelineEvent startEvt = new TimelineEvent();
+    startEvt.setEventType(HistoryEventType.TASK_ATTEMPT_STARTED.name());
+    startEvt.setTimestamp(event.getStartTime());
+    atsEntity.addEvent(startEvt);
+
+    atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+    atsEntity.addOtherInfo(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl());
+    atsEntity.addOtherInfo(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertTaskFinishedEvent(TaskFinishedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getTaskID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_TASK_ID.name());
+
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getTaskID().getVertexID().getDAGId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(),
+        event.getTaskID().getVertexID().toString());
+
+    TimelineEvent finishEvt = new TimelineEvent();
+    finishEvt.setEventType(HistoryEventType.TASK_FINISHED.name());
+    finishEvt.setTimestamp(event.getFinishTime());
+    atsEntity.addEvent(finishEvt);
+
+    atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
+    atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
+
+    atsEntity.addOtherInfo(ATSConstants.COUNTERS,
+        DAGUtils.convertCountersToATSMap(event.getTezCounters()));
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertTaskStartedEvent(TaskStartedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getTaskID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_TASK_ID.name());
+
+    atsEntity.addRelatedEntity(EntityTypes.TEZ_VERTEX_ID.name(),
+        event.getTaskID().getVertexID().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getTaskID().getVertexID().getDAGId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(),
+        event.getTaskID().getVertexID().toString());
+
+    TimelineEvent startEvt = new TimelineEvent();
+    startEvt.setEventType(HistoryEventType.TASK_STARTED.name());
+    startEvt.setTimestamp(event.getStartTime());
+    atsEntity.addEvent(startEvt);
+
+    atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+    atsEntity.addOtherInfo(ATSConstants.SCHEDULED_TIME, event.getScheduledTime());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertVertexFinishedEvent(VertexFinishedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getVertexID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name());
+
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getVertexID().getDAGId().toString());
+
+    TimelineEvent finishEvt = new TimelineEvent();
+    finishEvt.setEventType(HistoryEventType.VERTEX_FINISHED.name());
+    finishEvt.setTimestamp(event.getFinishTime());
+    atsEntity.addEvent(finishEvt);
+
+    atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
+    atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
+
+    atsEntity.addOtherInfo(ATSConstants.COUNTERS,
+        DAGUtils.convertCountersToATSMap(event.getTezCounters()));
+    atsEntity.addOtherInfo(ATSConstants.STATS,
+        DAGUtils.convertVertexStatsToATSMap(event.getVertexStats()));
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertVertexInitializedEvent(VertexInitializedEvent event)
{
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getVertexID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name());
+
+    atsEntity.addRelatedEntity(EntityTypes.TEZ_DAG_ID.name(),
+        event.getVertexID().getDAGId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getVertexID().getDAGId().toString());
+
+    TimelineEvent initEvt = new TimelineEvent();
+    initEvt.setEventType(HistoryEventType.VERTEX_INITIALIZED.name());
+    initEvt.setTimestamp(event.getInitedTime());
+    atsEntity.addEvent(initEvt);
+
+    atsEntity.addOtherInfo(ATSConstants.VERTEX_NAME, event.getVertexName());
+    atsEntity.addOtherInfo(ATSConstants.INIT_REQUESTED_TIME, event.getInitRequestedTime());
+    atsEntity.addOtherInfo(ATSConstants.INIT_TIME, event.getInitedTime());
+    atsEntity.addOtherInfo(ATSConstants.NUM_TASKS, event.getNumTasks());
+    atsEntity.addOtherInfo(ATSConstants.PROCESSOR_CLASS_NAME, event.getProcessorName());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertVertexStartedEvent(VertexStartedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getVertexID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name());
+
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getVertexID().getDAGId().toString());
+
+    TimelineEvent startEvt = new TimelineEvent();
+    startEvt.setEventType(HistoryEventType.VERTEX_STARTED.name());
+    startEvt.setTimestamp(event.getStartTime());
+    atsEntity.addEvent(startEvt);
+
+    atsEntity.addOtherInfo(ATSConstants.START_REQUESTED_TIME, event.getStartRequestedTime());
+    atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+
+    return atsEntity;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
new file mode 100644
index 0000000..7bd3b91
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.ats;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestATSHistoryLoggingService {
+
+  private ATSHistoryLoggingService atsHistoryLoggingService;
+  private AppContext appContext;
+  private Configuration conf;
+  private int atsInvokeCounter;
+  private SystemClock clock = new SystemClock();
+
+  @Before
+  public void setup() throws Exception {
+    appContext = mock(AppContext.class);
+    atsHistoryLoggingService = new ATSHistoryLoggingService();
+    atsHistoryLoggingService.setAppContext(appContext);
+    conf = new Configuration(false);
+    conf.setLong(TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS,
+        1000l);
+    atsInvokeCounter = 0;
+    atsHistoryLoggingService.init(conf);
+    atsHistoryLoggingService.timelineClient = mock(TimelineClient.class);
+    atsHistoryLoggingService.start();
+    when(appContext.getClock()).thenReturn(clock);
+    when(atsHistoryLoggingService.timelineClient.putEntities(any(TimelineEntity.class))).thenAnswer(
+        new Answer<Object>() {
+          @Override
+          public Object answer(InvocationOnMock invocation) throws Throwable {
+            ++atsInvokeCounter;
+            try {
+              Thread.sleep(500);
+            } catch (InterruptedException e) {
+              // do nothing
+            }
+            return null;
+          }
+        }
+    );
+  }
+
+  @After
+  public void teardown() {
+    atsHistoryLoggingService.stop();
+    atsHistoryLoggingService = null;
+  }
+
+  @Test(timeout=20000)
+  public void testATSHistoryLoggingServiceShutdown() {
+    TezDAGID tezDAGID = TezDAGID.getInstance(
+        ApplicationId.newInstance(100l, 1), 1);
+    DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID,
+        new DAGStartedEvent(tezDAGID, 1001l, "user1", "dagName1"));
+
+    for (int i = 0; i < 20; ++i) {
+      atsHistoryLoggingService.handle(historyEvent);
+    }
+
+    try {
+      Thread.sleep(2500l);
+    } catch (InterruptedException e) {
+      // Do nothing
+    }
+    atsHistoryLoggingService.stop();
+
+    Assert.assertTrue(atsInvokeCounter >= 4);
+    Assert.assertTrue(atsInvokeCounter < 10);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
new file mode 100644
index 0000000..c605585
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.ats;
+
+import java.util.Random;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.AMLaunchedEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerStoppedEvent;
+import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.codehaus.jettison.json.JSONException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestHistoryEventTimelineConversion {
+
+  private ApplicationAttemptId applicationAttemptId;
+  private ApplicationId applicationId;
+  private String user = "user";
+  private Random random = new Random();
+  private TezDAGID tezDAGID;
+  private TezVertexID tezVertexID;
+  private TezTaskID tezTaskID;
+  private TezTaskAttemptID tezTaskAttemptID;
+  private DAGPlan dagPlan;
+  private ContainerId containerId;
+  private NodeId nodeId;
+
+  @Before
+  public void setup() {
+    applicationId = ApplicationId.newInstance(9999l, 1);
+    applicationAttemptId = ApplicationAttemptId.newInstance(applicationId, 1);
+    tezDAGID = TezDAGID.getInstance(applicationId, random.nextInt());
+    tezVertexID = TezVertexID.getInstance(tezDAGID, random.nextInt());
+    tezTaskID = TezTaskID.getInstance(tezVertexID, random.nextInt());
+    tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, random.nextInt());
+    dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build();
+    containerId = ContainerId.newInstance(applicationAttemptId, 111);
+    nodeId = NodeId.newInstance("node", 13435);
+  }
+
+  @Test
+  public void testHandlerExists() throws JSONException {
+    for (HistoryEventType eventType : HistoryEventType.values()) {
+      HistoryEvent event = null;
+      switch (eventType) {
+        case AM_LAUNCHED:
+          event = new AMLaunchedEvent(applicationAttemptId, random.nextInt(), random.nextInt(),
+              user);
+          break;
+        case AM_STARTED:
+          event = new AMStartedEvent(applicationAttemptId, random.nextInt(), user);
+          break;
+        case DAG_SUBMITTED:
+          event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId,
+              null, user);
+          break;
+        case DAG_INITIALIZED:
+          event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName());
+          break;
+        case DAG_STARTED:
+          event = new DAGStartedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName());
+          break;
+        case DAG_FINISHED:
+          event = new DAGFinishedEvent(tezDAGID, random.nextInt(), random.nextInt(), DAGState.ERROR,
+              null, null, user, dagPlan.getName());
+          break;
+        case VERTEX_INITIALIZED:
+          event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
+              random.nextInt(), "proc", null);
+          break;
+        case VERTEX_STARTED:
+          event = new VertexStartedEvent(tezVertexID, random.nextInt(), random.nextInt());
+          break;
+        case VERTEX_PARALLELISM_UPDATED:
+          event = new VertexParallelismUpdatedEvent();
+          break;
+        case VERTEX_FINISHED:
+          event = new VertexFinishedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
+              random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR,
+              null, null, null);
+          break;
+        case TASK_STARTED:
+          event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt());
+          break;
+        case TASK_FINISHED:
+          event = new TaskFinishedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt(),
+              tezTaskAttemptID, TaskState.FAILED, null);
+          break;
+        case TASK_ATTEMPT_STARTED:
+          event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId,
+              nodeId, null, null);
+          break;
+        case TASK_ATTEMPT_FINISHED:
+          event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
+              random.nextInt(), TaskAttemptState.FAILED, null, null);
+          break;
+        case CONTAINER_LAUNCHED:
+          event = new ContainerLaunchedEvent(containerId, random.nextInt(),
+              applicationAttemptId);
+          break;
+        case CONTAINER_STOPPED:
+          event = new ContainerStoppedEvent(containerId, random.nextInt(), -1, applicationAttemptId);
+          break;
+        case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+          event = new VertexDataMovementEventsGeneratedEvent();
+          break;
+        case DAG_COMMIT_STARTED:
+          event = new DAGCommitStartedEvent();
+          break;
+        case VERTEX_COMMIT_STARTED:
+          event = new VertexCommitStartedEvent();
+          break;
+        case VERTEX_GROUP_COMMIT_STARTED:
+          event = new VertexGroupCommitStartedEvent();
+          break;
+        case VERTEX_GROUP_COMMIT_FINISHED:
+          event = new VertexGroupCommitFinishedEvent();
+          break;
+        default:
+          Assert.fail("Unhandled event type " + eventType);
+      }
+      if (event == null || !event.isHistoryEvent()) {
+        continue;
+      }
+      HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index 171458f..ffea08c 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.tez.client.AMConfiguration;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.client.TezSession;
@@ -60,6 +61,7 @@ import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService;
 import org.apache.tez.mapreduce.examples.ExampleDriver;
 import org.apache.tez.mapreduce.examples.IntersectDataGen;
 import org.apache.tez.mapreduce.examples.IntersectExample;
@@ -67,6 +69,7 @@ import org.apache.tez.mapreduce.examples.IntersectValidate;
 import org.apache.tez.runtime.library.processor.SleepProcessor;
 import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -362,5 +365,59 @@ public class TestTezJobs {
 
   }
 
+  // Submits a simple 5 stage sleep job using tez session. Then kills it.
+  @Test(timeout = 60000)
+  public void testHistoryLogging() throws IOException,
+      InterruptedException, TezException, ClassNotFoundException, YarnException {
+    SleepProcessorConfig spConf = new SleepProcessorConfig(1);
+
+    DAG dag = new DAG("TezSleepProcessorHistoryLogging");
+    Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor(
+        SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 2,
+        Resource.newInstance(1024, 1));
+    dag.addVertex(vertex);
+
+    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+    Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
+        .nextInt(100000))));
+    remoteFs.mkdirs(remoteStagingDir);
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
+
+    FileSystem localFs = FileSystem.getLocal(tezConf);
+    Path historyLogDir = new Path(TEST_ROOT_DIR, "testHistoryLogging");
+    localFs.mkdirs(historyLogDir);
+
+    tezConf.set(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR,
+        localFs.makeQualified(historyLogDir).toString());
+
+    TezClient tezClient = new TezClient(tezConf);
+    AMConfiguration amConf = new AMConfiguration(new HashMap<String, String>(),
+        new HashMap<String, LocalResource>(), tezConf, null);
+
+    DAGClient dagClient = tezClient.submitDAGApplication(dag, amConf);
+
+    DAGStatus dagStatus = dagClient.getDAGStatus(null);
+    while (!dagStatus.isCompleted()) {
+      LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+          + dagStatus.getState());
+      Thread.sleep(500l);
+      dagStatus = dagClient.getDAGStatus(null);
+    }
+    assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+
+    FileStatus historyLogFileStatus = null;
+    for (FileStatus fileStatus : localFs.listStatus(historyLogDir)) {
+      if (fileStatus.isDirectory()) {
+        continue;
+      }
+      Path p = fileStatus.getPath();
+      if (p.getName().startsWith(SimpleHistoryLoggingService.LOG_FILE_NAME_PREFIX)) {
+        historyLogFileStatus = fileStatus;
+        break;
+      }
+    }
+    Assert.assertNotNull(historyLogFileStatus);
+    Assert.assertTrue(historyLogFileStatus.getLen() > 0);
+  }
 
 }


Mime
View raw message