Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E2D7D179BB for ; Wed, 11 Mar 2015 04:38:55 +0000 (UTC) Received: (qmail 59734 invoked by uid 500); 11 Mar 2015 04:38:55 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 59641 invoked by uid 500); 11 Mar 2015 04:38:55 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 59612 invoked by uid 99); 11 Mar 2015 04:38:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Mar 2015 04:38:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 695A6E10BB; Wed, 11 Mar 2015 04:38:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hitesh@apache.org To: commits@tez.apache.org Date: Wed, 11 Mar 2015 04:38:57 -0000 Message-Id: In-Reply-To: <60bc76ebbb24429db8caeb3380a400b4@git.apache.org> References: <60bc76ebbb24429db8caeb3380a400b4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/6] tez git commit: TEZ-2168. Fix application dependencies on mutually exclusive artifacts: tez-yarn-timeline-history and tez-yarn-timeline-history-with-acls. (hitesh) http://git-wip-us.apache.org/repos/asf/tez/blob/6692c514/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats deleted file mode 120000 index 5ef7ca9..0000000 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats +++ /dev/null @@ -1 +0,0 @@ -../../../../../../../../../../tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats \ No newline at end of file diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java new file mode 100644 index 0000000..c68d395 --- /dev/null +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java @@ -0,0 +1,325 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history.logging.ats; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.common.security.HistoryACLPolicyManager; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.history.HistoryEventType; +import org.apache.tez.dag.history.events.DAGSubmittedEvent; +import org.apache.tez.dag.history.logging.HistoryLoggingService; +import org.apache.tez.dag.records.TezDAGID; + +import com.google.common.annotations.VisibleForTesting; + +public class ATSHistoryLoggingService extends HistoryLoggingService { + + private static final Log LOG = LogFactory.getLog(ATSHistoryLoggingService.class); + + private LinkedBlockingQueue eventQueue = + new LinkedBlockingQueue(); + + private Thread eventHandlingThread; + private AtomicBoolean stopped = new AtomicBoolean(false); + private int eventCounter = 0; + private int eventsProcessed = 0; + private final Object lock = new Object(); + + @VisibleForTesting + TimelineClient timelineClient; + + private HashSet skippedDAGs = new HashSet(); + private Map dagDomainIdMap = new HashMap(); + private long maxTimeToWaitOnShutdown; + private boolean waitForeverOnShutdown = false; + + private int maxEventsPerBatch; + private long maxPollingTimeMillis; + + private String sessionDomainId; + private static final String atsHistoryACLManagerClassName = + "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager"; + private HistoryACLPolicyManager historyACLPolicyManager; + + public ATSHistoryLoggingService() { + super(ATSHistoryLoggingService.class.getName()); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + LOG.info("Initializing ATSService"); + timelineClient = TimelineClient.createTimelineClient(); + timelineClient.init(conf); + maxTimeToWaitOnShutdown = conf.getLong( + TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS, + TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS_DEFAULT); + maxEventsPerBatch = conf.getInt( + TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH, + TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH_DEFAULT); + maxPollingTimeMillis = conf.getInt( + TezConfiguration.YARN_ATS_MAX_POLLING_TIME_PER_EVENT, + TezConfiguration.YARN_ATS_MAX_POLLING_TIME_PER_EVENT_DEFAULT); + if (maxTimeToWaitOnShutdown < 0) { + waitForeverOnShutdown = true; + } + sessionDomainId = conf.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID); + + LOG.info("Using " + atsHistoryACLManagerClassName + " to manage Timeline ACLs"); + try { + historyACLPolicyManager = ReflectionUtils.createClazzInstance( + atsHistoryACLManagerClassName); + historyACLPolicyManager.setConf(conf); + } catch (TezUncheckedException e) { + LOG.warn("Could not instantiate object for " + atsHistoryACLManagerClassName + + ". ACLs cannot be enforced correctly for history data in Timeline", e); + if (!conf.getBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, + TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS_DEFAULT)) { + throw e; + } + historyACLPolicyManager = null; + } + + } + + @Override + public void serviceStart() { + LOG.info("Starting ATSService"); + timelineClient.start(); + + eventHandlingThread = new Thread(new Runnable() { + @Override + public void run() { + List events = new LinkedList(); + boolean interrupted = false; + while (!stopped.get() && !Thread.currentThread().isInterrupted() + && !interrupted) { + + // Log the size of the event-queue every so often. + if (eventCounter != 0 && eventCounter % 1000 == 0) { + if (eventsProcessed != 0 && !events.isEmpty()) { + LOG.info("Event queue stats" + + ", eventsProcessedSinceLastUpdate=" + eventsProcessed + + ", eventQueueSize=" + eventQueue.size()); + } + eventCounter = 0; + eventsProcessed = 0; + } else { + ++eventCounter; + } + + synchronized (lock) { + try { + getEventBatch(events); + } catch (InterruptedException e) { + // Finish processing events and then return + interrupted = true; + } + + if (events.isEmpty()) { + continue; + } + + eventsProcessed += events.size(); + try { + handleEvents(events); + } catch (Exception e) { + LOG.warn("Error handling events", e); + } + } + } + } + }, "HistoryEventHandlingThread"); + eventHandlingThread.start(); + } + + @Override + public void serviceStop() { + LOG.info("Stopping ATSService" + + ", eventQueueBacklog=" + eventQueue.size()); + stopped.set(true); + if (eventHandlingThread != null) { + eventHandlingThread.interrupt(); + } + synchronized (lock) { + if (!eventQueue.isEmpty()) { + LOG.warn("ATSService being stopped" + + ", eventQueueBacklog=" + eventQueue.size() + + ", maxTimeLeftToFlush=" + maxTimeToWaitOnShutdown + + ", waitForever=" + waitForeverOnShutdown); + long startTime = appContext.getClock().getTime(); + long endTime = startTime + maxTimeToWaitOnShutdown; + List events = new LinkedList(); + while (waitForeverOnShutdown || (endTime >= appContext.getClock().getTime())) { + try { + getEventBatch(events); + } catch (InterruptedException e) { + LOG.info("ATSService interrupted while shutting down. Exiting." + + " EventQueueBacklog=" + eventQueue.size()); + } + if (events.isEmpty()) { + LOG.info("Event queue empty, stopping ATS Service"); + break; + } + try { + handleEvents(events); + } catch (Exception e) { + LOG.warn("Error handling event", e); + break; + } + } + } + } + if (!eventQueue.isEmpty()) { + LOG.warn("Did not finish flushing eventQueue before stopping ATSService" + + ", eventQueueBacklog=" + eventQueue.size()); + } + timelineClient.stop(); + } + + private void getEventBatch(List events) throws InterruptedException { + events.clear(); + int counter = 0; + while (counter < maxEventsPerBatch) { + DAGHistoryEvent event = eventQueue.poll(maxPollingTimeMillis, TimeUnit.MILLISECONDS); + if (event == null) { + break; + } + if (!isValidEvent(event)) { + continue; + } + ++counter; + events.add(event); + if (event.getHistoryEvent().getEventType().equals(HistoryEventType.DAG_SUBMITTED)) { + // Special case this as it might be a large payload + break; + } + } + } + + + public void handle(DAGHistoryEvent event) { + eventQueue.add(event); + } + + private boolean isValidEvent(DAGHistoryEvent event) { + HistoryEventType eventType = event.getHistoryEvent().getEventType(); + TezDAGID dagId = event.getDagID(); + + if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) { + DAGSubmittedEvent dagSubmittedEvent = + (DAGSubmittedEvent) event.getHistoryEvent(); + String dagName = dagSubmittedEvent.getDAGName(); + if (dagName != null + && dagName.startsWith( + TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) { + // Skip recording pre-warm DAG events + skippedDAGs.add(dagId); + return false; + } + if (historyACLPolicyManager != null) { + String dagDomainId = dagSubmittedEvent.getConf().get( + TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID); + if (dagDomainId != null) { + dagDomainIdMap.put(dagId, dagDomainId); + } + } + } + if (eventType.equals(HistoryEventType.DAG_FINISHED)) { + // Remove from set to keep size small + // No more events should be seen after this point. + if (skippedDAGs.remove(dagId)) { + return false; + } + } + + if (dagId != null && skippedDAGs.contains(dagId)) { + // Skip pre-warm DAGs + return false; + } + + return true; + } + + private void handleEvents(List events) { + TimelineEntity[] entities = new TimelineEntity[events.size()]; + for (int i = 0; i < events.size(); ++i) { + DAGHistoryEvent event = events.get(i); + String domainId = sessionDomainId; + TezDAGID dagId = event.getDagID(); + + if (historyACLPolicyManager != null && dagId != null) { + if (dagDomainIdMap.containsKey(dagId)) { + domainId = dagDomainIdMap.get(dagId); + } + } + + entities[i] = HistoryEventTimelineConversion.convertToTimelineEntity(event.getHistoryEvent()); + if (historyACLPolicyManager != null) { + if (domainId != null && !domainId.isEmpty()) { + historyACLPolicyManager.updateTimelineEntityDomain(entities[i], domainId); + } + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Sending event batch to Timeline, batchSize=" + events.size()); + } + try { + TimelinePutResponse response = + timelineClient.putEntities(entities); + if (response != null + && !response.getErrors().isEmpty()) { + int count = response.getErrors().size(); + for (int i = 0; i < count; ++i) { + TimelinePutError err = response.getErrors().get(i); + if (err.getErrorCode() != 0) { + LOG.warn("Could not post history event to ATS" + + ", atsPutError=" + err.getErrorCode() + + ", entityId=" + entities[i].getEntityId() + + ", eventType=" + events.get(i).getHistoryEvent().getEventType()); + } + } + } + // Do nothing additional, ATS client library should handle throttling + // or auto-disable as needed + } catch (Exception e) { + LOG.warn("Could not handle history events", e); + } + } + +} diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java new file mode 100644 index 0000000..ca47b92 --- /dev/null +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java @@ -0,0 +1,631 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history.logging.ats; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.TreeMap; + +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.tez.common.ATSConstants; +import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.dag.api.oldrecords.TaskState; +import org.apache.tez.dag.history.HistoryEvent; +import org.apache.tez.dag.history.HistoryEventType; +import org.apache.tez.dag.history.events.AMLaunchedEvent; +import org.apache.tez.dag.history.events.AMStartedEvent; +import org.apache.tez.dag.history.events.AppLaunchedEvent; +import org.apache.tez.dag.history.events.ContainerLaunchedEvent; +import org.apache.tez.dag.history.events.ContainerStoppedEvent; +import org.apache.tez.dag.history.events.DAGFinishedEvent; +import org.apache.tez.dag.history.events.DAGInitializedEvent; +import org.apache.tez.dag.history.events.DAGStartedEvent; +import org.apache.tez.dag.history.events.DAGSubmittedEvent; +import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; +import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; +import org.apache.tez.dag.history.events.TaskFinishedEvent; +import org.apache.tez.dag.history.events.TaskStartedEvent; +import org.apache.tez.dag.history.events.VertexFinishedEvent; +import org.apache.tez.dag.history.events.VertexInitializedEvent; +import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent; +import org.apache.tez.dag.history.events.VertexStartedEvent; +import org.apache.tez.dag.history.logging.EntityTypes; +import org.apache.tez.dag.history.utils.DAGUtils; +import org.apache.tez.dag.records.TezVertexID; + +public class HistoryEventTimelineConversion { + + public static TimelineEntity convertToTimelineEntity(HistoryEvent historyEvent) { + if (!historyEvent.isHistoryEvent()) { + throw new UnsupportedOperationException("Invalid Event, does not support history" + + ", eventType=" + historyEvent.getEventType()); + } + TimelineEntity timelineEntity; + switch (historyEvent.getEventType()) { + case APP_LAUNCHED: + timelineEntity = convertAppLaunchedEvent((AppLaunchedEvent) historyEvent); + break; + case AM_LAUNCHED: + timelineEntity = convertAMLaunchedEvent((AMLaunchedEvent) historyEvent); + break; + case AM_STARTED: + timelineEntity = convertAMStartedEvent((AMStartedEvent) historyEvent); + break; + case CONTAINER_LAUNCHED: + timelineEntity = convertContainerLaunchedEvent((ContainerLaunchedEvent) historyEvent); + break; + case CONTAINER_STOPPED: + timelineEntity = convertContainerStoppedEvent((ContainerStoppedEvent) historyEvent); + break; + case DAG_SUBMITTED: + timelineEntity = convertDAGSubmittedEvent((DAGSubmittedEvent) historyEvent); + break; + case DAG_INITIALIZED: + timelineEntity = convertDAGInitializedEvent((DAGInitializedEvent) historyEvent); + break; + case DAG_STARTED: + timelineEntity = convertDAGStartedEvent((DAGStartedEvent) historyEvent); + break; + case DAG_FINISHED: + timelineEntity = convertDAGFinishedEvent((DAGFinishedEvent) historyEvent); + break; + case VERTEX_INITIALIZED: + timelineEntity = convertVertexInitializedEvent((VertexInitializedEvent) historyEvent); + break; + case VERTEX_STARTED: + timelineEntity = convertVertexStartedEvent((VertexStartedEvent) historyEvent); + break; + case VERTEX_FINISHED: + timelineEntity = convertVertexFinishedEvent((VertexFinishedEvent) historyEvent); + break; + case TASK_STARTED: + timelineEntity = convertTaskStartedEvent((TaskStartedEvent) historyEvent); + break; + case TASK_FINISHED: + timelineEntity = convertTaskFinishedEvent((TaskFinishedEvent) historyEvent); + break; + case TASK_ATTEMPT_STARTED: + timelineEntity = convertTaskAttemptStartedEvent((TaskAttemptStartedEvent) historyEvent); + break; + case TASK_ATTEMPT_FINISHED: + timelineEntity = convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent); + break; + case VERTEX_PARALLELISM_UPDATED: + timelineEntity = convertVertexParallelismUpdatedEvent( + (VertexParallelismUpdatedEvent) historyEvent); + break; + case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED: + case VERTEX_COMMIT_STARTED: + case VERTEX_GROUP_COMMIT_STARTED: + case VERTEX_GROUP_COMMIT_FINISHED: + case DAG_COMMIT_STARTED: + throw new UnsupportedOperationException("Invalid Event, does not support history" + + ", eventType=" + historyEvent.getEventType()); + default: + throw new UnsupportedOperationException("Unhandled Event" + + ", eventType=" + historyEvent.getEventType()); + } + return timelineEntity; + } + + private static TimelineEntity convertAppLaunchedEvent(AppLaunchedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId("tez_" + + event.getApplicationId().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_APPLICATION.name()); + + atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ID, + event.getApplicationId().toString()); + atsEntity.addRelatedEntity(ATSConstants.USER, event.getUser()); + + atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); + + atsEntity.addOtherInfo(ATSConstants.CONFIG, + DAGUtils.convertConfigurationToATSMap(event.getConf())); + + atsEntity.setStartTime(event.getLaunchTime()); + + if (event.getVersion() != null) { + atsEntity.addOtherInfo(ATSConstants.TEZ_VERSION, + DAGUtils.convertTezVersionToATSMap(event.getVersion())); + } + + return atsEntity; + } + + private static TimelineEntity convertAMLaunchedEvent(AMLaunchedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId("tez_" + + event.getApplicationAttemptId().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_APPLICATION_ATTEMPT.name()); + + atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ID, + event.getApplicationAttemptId().getApplicationId().toString()); + atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ATTEMPT_ID, + event.getApplicationAttemptId().toString()); + atsEntity.addRelatedEntity(ATSConstants.USER, event.getUser()); + + atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getApplicationAttemptId().getApplicationId().toString()); + + atsEntity.setStartTime(event.getLaunchTime()); + + TimelineEvent launchEvt = new TimelineEvent(); + launchEvt.setEventType(HistoryEventType.AM_LAUNCHED.name()); + launchEvt.setTimestamp(event.getLaunchTime()); + atsEntity.addEvent(launchEvt); + + atsEntity.addOtherInfo(ATSConstants.APP_SUBMIT_TIME, event.getAppSubmitTime()); + + return atsEntity; + } + + private static TimelineEntity convertAMStartedEvent(AMStartedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId("tez_" + + event.getApplicationAttemptId().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_APPLICATION_ATTEMPT.name()); + + atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getApplicationAttemptId().getApplicationId().toString()); + + TimelineEvent startEvt = new TimelineEvent(); + startEvt.setEventType(HistoryEventType.AM_STARTED.name()); + startEvt.setTimestamp(event.getStartTime()); + atsEntity.addEvent(startEvt); + + return atsEntity; + } + + private static TimelineEntity convertContainerLaunchedEvent(ContainerLaunchedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId("tez_" + + event.getContainerId().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_CONTAINER_ID.name()); + + atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), + "tez_" + event.getApplicationAttemptId().toString()); + atsEntity.addRelatedEntity(ATSConstants.CONTAINER_ID, + event.getContainerId().toString()); + + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getApplicationAttemptId().getApplicationId().toString()); + + atsEntity.setStartTime(event.getLaunchTime()); + + TimelineEvent launchEvt = new TimelineEvent(); + launchEvt.setEventType(HistoryEventType.CONTAINER_LAUNCHED.name()); + launchEvt.setTimestamp(event.getLaunchTime()); + atsEntity.addEvent(launchEvt); + + return atsEntity; + } + + private static TimelineEntity convertContainerStoppedEvent(ContainerStoppedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId("tez_" + + event.getContainerId().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_CONTAINER_ID.name()); + + // In case, a container is stopped in a different attempt + atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), + "tez_" + event.getApplicationAttemptId().toString()); + + TimelineEvent stoppedEvt = new TimelineEvent(); + stoppedEvt.setEventType(HistoryEventType.CONTAINER_STOPPED.name()); + stoppedEvt.setTimestamp(event.getStoppedTime()); + atsEntity.addEvent(stoppedEvt); + + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getApplicationAttemptId().getApplicationId().toString()); + atsEntity.addPrimaryFilter(ATSConstants.EXIT_STATUS, event.getExitStatus()); + + atsEntity.addOtherInfo(ATSConstants.EXIT_STATUS, event.getExitStatus()); + atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getStoppedTime()); + + return atsEntity; + } + + private static TimelineEntity convertDAGFinishedEvent(DAGFinishedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getDagID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name()); + + TimelineEvent finishEvt = new TimelineEvent(); + finishEvt.setEventType(HistoryEventType.DAG_FINISHED.name()); + finishEvt.setTimestamp(event.getFinishTime()); + atsEntity.addEvent(finishEvt); + + atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getDagID().getApplicationId().toString()); + atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName()); + atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name()); + + atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime()); + atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime()); + atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); + atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name()); + atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); + atsEntity.addOtherInfo(ATSConstants.COUNTERS, + DAGUtils.convertCountersToATSMap(event.getTezCounters())); + + final Map dagTaskStats = event.getDagTaskStats(); + if (dagTaskStats != null) { + for(Entry entry : dagTaskStats.entrySet()) { + atsEntity.addOtherInfo(entry.getKey(), entry.getValue()); + } + } + + return atsEntity; + } + + private static TimelineEntity convertDAGInitializedEvent(DAGInitializedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getDagID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name()); + + TimelineEvent initEvt = new TimelineEvent(); + initEvt.setEventType(HistoryEventType.DAG_INITIALIZED.name()); + initEvt.setTimestamp(event.getInitTime()); + atsEntity.addEvent(initEvt); + + atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getDagID().getApplicationId().toString()); + atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName()); + + atsEntity.addOtherInfo(ATSConstants.INIT_TIME, event.getInitTime()); + + if (event.getVertexNameIDMap() != null) { + Map nameIdStrMap = new TreeMap(); + for (Entry entry : event.getVertexNameIDMap().entrySet()) { + nameIdStrMap.put(entry.getKey(), entry.getValue().toString()); + } + atsEntity.addOtherInfo(ATSConstants.VERTEX_NAME_ID_MAPPING, nameIdStrMap); + } + + return atsEntity; + } + + private static TimelineEntity convertDAGStartedEvent(DAGStartedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getDagID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name()); + + TimelineEvent startEvt = new TimelineEvent(); + startEvt.setEventType(HistoryEventType.DAG_STARTED.name()); + startEvt.setTimestamp(event.getStartTime()); + atsEntity.addEvent(startEvt); + + atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getDagID().getApplicationId().toString()); + atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName()); + + atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime()); + atsEntity.addOtherInfo(ATSConstants.STATUS, event.getDagState().toString()); + + return atsEntity; + } + + private static TimelineEntity convertDAGSubmittedEvent(DAGSubmittedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getDagID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name()); + + atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION.name(), + "tez_" + event.getApplicationAttemptId().getApplicationId().toString()); + atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), + "tez_" + event.getApplicationAttemptId().toString()); + atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ID, + event.getApplicationAttemptId().getApplicationId().toString()); + atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ATTEMPT_ID, + event.getApplicationAttemptId().toString()); + atsEntity.addRelatedEntity(ATSConstants.USER, event.getUser()); + + TimelineEvent submitEvt = new TimelineEvent(); + submitEvt.setEventType(HistoryEventType.DAG_SUBMITTED.name()); + submitEvt.setTimestamp(event.getSubmitTime()); + atsEntity.addEvent(submitEvt); + + atsEntity.setStartTime(event.getSubmitTime()); + + atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); + atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDAGName()); + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getDagID().getApplicationId().toString()); + + try { + atsEntity.addOtherInfo(ATSConstants.DAG_PLAN, + DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan())); + } catch (IOException e) { + throw new TezUncheckedException(e); + } + atsEntity.addOtherInfo(ATSConstants.APPLICATION_ID, + event.getApplicationAttemptId().getApplicationId().toString()); + + return atsEntity; + } + + private static TimelineEntity convertTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getTaskAttemptID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_TASK_ATTEMPT_ID.name()); + + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getApplicationId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), + event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(), + event.getTaskAttemptID().getTaskID().getVertexID().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_TASK_ID.name(), + event.getTaskAttemptID().getTaskID().toString()); + + TimelineEvent finishEvt = new TimelineEvent(); + finishEvt.setEventType(HistoryEventType.TASK_ATTEMPT_FINISHED.name()); + finishEvt.setTimestamp(event.getFinishTime()); + atsEntity.addEvent(finishEvt); + + atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name()); + + atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime()); + atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); + atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name()); + if (event.getTaskAttemptError() != null) { + atsEntity.addOtherInfo(ATSConstants.TASK_ATTEMPT_ERROR_ENUM, event.getTaskAttemptError().name()); + } + atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); + atsEntity.addOtherInfo(ATSConstants.COUNTERS, + DAGUtils.convertCountersToATSMap(event.getCounters())); + + return atsEntity; + } + + private static TimelineEntity convertTaskAttemptStartedEvent(TaskAttemptStartedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getTaskAttemptID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_TASK_ATTEMPT_ID.name()); + + atsEntity.setStartTime(event.getStartTime()); + + atsEntity.addRelatedEntity(ATSConstants.NODE_ID, event.getNodeId().toString()); + atsEntity.addRelatedEntity(ATSConstants.CONTAINER_ID, event.getContainerId().toString()); + atsEntity.addRelatedEntity(EntityTypes.TEZ_TASK_ID.name(), + event.getTaskAttemptID().getTaskID().toString()); + + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getApplicationId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), + event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(), + event.getTaskAttemptID().getTaskID().getVertexID().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_TASK_ID.name(), + event.getTaskAttemptID().getTaskID().toString()); + + TimelineEvent startEvt = new TimelineEvent(); + startEvt.setEventType(HistoryEventType.TASK_ATTEMPT_STARTED.name()); + startEvt.setTimestamp(event.getStartTime()); + atsEntity.addEvent(startEvt); + + atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime()); + atsEntity.addOtherInfo(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl()); + atsEntity.addOtherInfo(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl()); + atsEntity.addOtherInfo(ATSConstants.NODE_ID, event.getNodeId().toString()); + atsEntity.addOtherInfo(ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress()); + atsEntity.addOtherInfo(ATSConstants.CONTAINER_ID, event.getContainerId().toString()); + atsEntity.addOtherInfo(ATSConstants.STATUS, TaskAttemptState.RUNNING.name()); + + return atsEntity; + } + + private static TimelineEntity convertTaskFinishedEvent(TaskFinishedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getTaskID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_TASK_ID.name()); + + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getTaskID().getVertexID().getDAGId().getApplicationId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), + event.getTaskID().getVertexID().getDAGId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(), + event.getTaskID().getVertexID().toString()); + + TimelineEvent finishEvt = new TimelineEvent(); + finishEvt.setEventType(HistoryEventType.TASK_FINISHED.name()); + finishEvt.setTimestamp(event.getFinishTime()); + atsEntity.addEvent(finishEvt); + + atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name()); + + atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime()); + atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); + atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name()); + if (event.getSuccessfulAttemptID() != null) { + atsEntity.addOtherInfo(ATSConstants.SUCCESSFUL_ATTEMPT_ID, + event.getSuccessfulAttemptID().toString()); + } + + atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); + atsEntity.addOtherInfo(ATSConstants.COUNTERS, + DAGUtils.convertCountersToATSMap(event.getTezCounters())); + + return atsEntity; + } + + private static TimelineEntity convertTaskStartedEvent(TaskStartedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getTaskID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_TASK_ID.name()); + + atsEntity.addRelatedEntity(EntityTypes.TEZ_VERTEX_ID.name(), + event.getTaskID().getVertexID().toString()); + + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getTaskID().getVertexID().getDAGId().getApplicationId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), + event.getTaskID().getVertexID().getDAGId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(), + event.getTaskID().getVertexID().toString()); + + TimelineEvent startEvt = new TimelineEvent(); + startEvt.setEventType(HistoryEventType.TASK_STARTED.name()); + startEvt.setTimestamp(event.getStartTime()); + atsEntity.addEvent(startEvt); + + atsEntity.setStartTime(event.getStartTime()); + + atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime()); + atsEntity.addOtherInfo(ATSConstants.SCHEDULED_TIME, event.getScheduledTime()); + atsEntity.addOtherInfo(ATSConstants.STATUS, TaskState.SCHEDULED.name()); + + return atsEntity; + } + + private static TimelineEntity convertVertexFinishedEvent(VertexFinishedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getVertexID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name()); + + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getVertexID().getDAGId().getApplicationId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), + event.getVertexID().getDAGId().toString()); + + TimelineEvent finishEvt = new TimelineEvent(); + finishEvt.setEventType(HistoryEventType.VERTEX_FINISHED.name()); + finishEvt.setTimestamp(event.getFinishTime()); + atsEntity.addEvent(finishEvt); + + atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name()); + + atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime()); + atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); + atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name()); + + atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); + atsEntity.addOtherInfo(ATSConstants.COUNTERS, + DAGUtils.convertCountersToATSMap(event.getTezCounters())); + atsEntity.addOtherInfo(ATSConstants.STATS, + DAGUtils.convertVertexStatsToATSMap(event.getVertexStats())); + + final Map vertexTaskStats = event.getVertexTaskStats(); + if (vertexTaskStats != null) { + for(Entry entry : vertexTaskStats.entrySet()) { + atsEntity.addOtherInfo(entry.getKey(), entry.getValue()); + } + } + + return atsEntity; + } + + private static TimelineEntity convertVertexInitializedEvent(VertexInitializedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getVertexID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name()); + + atsEntity.addRelatedEntity(EntityTypes.TEZ_DAG_ID.name(), + event.getVertexID().getDAGId().toString()); + + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getVertexID().getDAGId().getApplicationId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), + event.getVertexID().getDAGId().toString()); + + TimelineEvent initEvt = new TimelineEvent(); + initEvt.setEventType(HistoryEventType.VERTEX_INITIALIZED.name()); + initEvt.setTimestamp(event.getInitedTime()); + atsEntity.addEvent(initEvt); + + atsEntity.setStartTime(event.getInitedTime()); + + atsEntity.addOtherInfo(ATSConstants.VERTEX_NAME, event.getVertexName()); + atsEntity.addOtherInfo(ATSConstants.INIT_REQUESTED_TIME, event.getInitRequestedTime()); + atsEntity.addOtherInfo(ATSConstants.INIT_TIME, event.getInitedTime()); + atsEntity.addOtherInfo(ATSConstants.NUM_TASKS, event.getNumTasks()); + atsEntity.addOtherInfo(ATSConstants.PROCESSOR_CLASS_NAME, event.getProcessorName()); + + return atsEntity; + } + + private static TimelineEntity convertVertexStartedEvent(VertexStartedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getVertexID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name()); + + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getVertexID().getDAGId().getApplicationId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), + event.getVertexID().getDAGId().toString()); + + TimelineEvent startEvt = new TimelineEvent(); + startEvt.setEventType(HistoryEventType.VERTEX_STARTED.name()); + startEvt.setTimestamp(event.getStartTime()); + atsEntity.addEvent(startEvt); + + atsEntity.addOtherInfo(ATSConstants.START_REQUESTED_TIME, event.getStartRequestedTime()); + atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime()); + atsEntity.addOtherInfo(ATSConstants.STATUS, event.getVertexState().toString()); + + return atsEntity; + } + + private static TimelineEntity convertVertexParallelismUpdatedEvent( + VertexParallelismUpdatedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getVertexID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name()); + + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getVertexID().getDAGId().getApplicationId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), + event.getVertexID().getDAGId().toString()); + + TimelineEvent updateEvt = new TimelineEvent(); + updateEvt.setEventType(HistoryEventType.VERTEX_PARALLELISM_UPDATED.name()); + updateEvt.setTimestamp(event.getUpdateTime()); + + Map eventInfo = new HashMap(); + if (event.getSourceEdgeManagers() != null && !event.getSourceEdgeManagers().isEmpty()) { + Map updatedEdgeManagers = new HashMap(); + for (Entry entry : + event.getSourceEdgeManagers().entrySet()) { + updatedEdgeManagers.put(entry.getKey(), + DAGUtils.convertEdgeManagerPluginDescriptor(entry.getValue())); + } + eventInfo.put(ATSConstants.UPDATED_EDGE_MANAGERS, updatedEdgeManagers); + } + eventInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks()); + eventInfo.put(ATSConstants.OLD_NUM_TASKS, event.getOldNumTasks()); + updateEvt.setEventInfo(eventInfo); + atsEntity.addEvent(updateEvt); + + atsEntity.addOtherInfo(ATSConstants.NUM_TASKS, event.getNumTasks()); + + return atsEntity; + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/6692c514/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java new file mode 100644 index 0000000..c68d395 --- /dev/null +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java @@ -0,0 +1,325 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history.logging.ats; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.common.security.HistoryACLPolicyManager; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.history.HistoryEventType; +import org.apache.tez.dag.history.events.DAGSubmittedEvent; +import org.apache.tez.dag.history.logging.HistoryLoggingService; +import org.apache.tez.dag.records.TezDAGID; + +import com.google.common.annotations.VisibleForTesting; + +public class ATSHistoryLoggingService extends HistoryLoggingService { + + private static final Log LOG = LogFactory.getLog(ATSHistoryLoggingService.class); + + private LinkedBlockingQueue eventQueue = + new LinkedBlockingQueue(); + + private Thread eventHandlingThread; + private AtomicBoolean stopped = new AtomicBoolean(false); + private int eventCounter = 0; + private int eventsProcessed = 0; + private final Object lock = new Object(); + + @VisibleForTesting + TimelineClient timelineClient; + + private HashSet skippedDAGs = new HashSet(); + private Map dagDomainIdMap = new HashMap(); + private long maxTimeToWaitOnShutdown; + private boolean waitForeverOnShutdown = false; + + private int maxEventsPerBatch; + private long maxPollingTimeMillis; + + private String sessionDomainId; + private static final String atsHistoryACLManagerClassName = + "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager"; + private HistoryACLPolicyManager historyACLPolicyManager; + + public ATSHistoryLoggingService() { + super(ATSHistoryLoggingService.class.getName()); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + LOG.info("Initializing ATSService"); + timelineClient = TimelineClient.createTimelineClient(); + timelineClient.init(conf); + maxTimeToWaitOnShutdown = conf.getLong( + TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS, + TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS_DEFAULT); + maxEventsPerBatch = conf.getInt( + TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH, + TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH_DEFAULT); + maxPollingTimeMillis = conf.getInt( + TezConfiguration.YARN_ATS_MAX_POLLING_TIME_PER_EVENT, + TezConfiguration.YARN_ATS_MAX_POLLING_TIME_PER_EVENT_DEFAULT); + if (maxTimeToWaitOnShutdown < 0) { + waitForeverOnShutdown = true; + } + sessionDomainId = conf.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID); + + LOG.info("Using " + atsHistoryACLManagerClassName + " to manage Timeline ACLs"); + try { + historyACLPolicyManager = ReflectionUtils.createClazzInstance( + atsHistoryACLManagerClassName); + historyACLPolicyManager.setConf(conf); + } catch (TezUncheckedException e) { + LOG.warn("Could not instantiate object for " + atsHistoryACLManagerClassName + + ". ACLs cannot be enforced correctly for history data in Timeline", e); + if (!conf.getBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, + TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS_DEFAULT)) { + throw e; + } + historyACLPolicyManager = null; + } + + } + + @Override + public void serviceStart() { + LOG.info("Starting ATSService"); + timelineClient.start(); + + eventHandlingThread = new Thread(new Runnable() { + @Override + public void run() { + List events = new LinkedList(); + boolean interrupted = false; + while (!stopped.get() && !Thread.currentThread().isInterrupted() + && !interrupted) { + + // Log the size of the event-queue every so often. + if (eventCounter != 0 && eventCounter % 1000 == 0) { + if (eventsProcessed != 0 && !events.isEmpty()) { + LOG.info("Event queue stats" + + ", eventsProcessedSinceLastUpdate=" + eventsProcessed + + ", eventQueueSize=" + eventQueue.size()); + } + eventCounter = 0; + eventsProcessed = 0; + } else { + ++eventCounter; + } + + synchronized (lock) { + try { + getEventBatch(events); + } catch (InterruptedException e) { + // Finish processing events and then return + interrupted = true; + } + + if (events.isEmpty()) { + continue; + } + + eventsProcessed += events.size(); + try { + handleEvents(events); + } catch (Exception e) { + LOG.warn("Error handling events", e); + } + } + } + } + }, "HistoryEventHandlingThread"); + eventHandlingThread.start(); + } + + @Override + public void serviceStop() { + LOG.info("Stopping ATSService" + + ", eventQueueBacklog=" + eventQueue.size()); + stopped.set(true); + if (eventHandlingThread != null) { + eventHandlingThread.interrupt(); + } + synchronized (lock) { + if (!eventQueue.isEmpty()) { + LOG.warn("ATSService being stopped" + + ", eventQueueBacklog=" + eventQueue.size() + + ", maxTimeLeftToFlush=" + maxTimeToWaitOnShutdown + + ", waitForever=" + waitForeverOnShutdown); + long startTime = appContext.getClock().getTime(); + long endTime = startTime + maxTimeToWaitOnShutdown; + List events = new LinkedList(); + while (waitForeverOnShutdown || (endTime >= appContext.getClock().getTime())) { + try { + getEventBatch(events); + } catch (InterruptedException e) { + LOG.info("ATSService interrupted while shutting down. Exiting." + + " EventQueueBacklog=" + eventQueue.size()); + } + if (events.isEmpty()) { + LOG.info("Event queue empty, stopping ATS Service"); + break; + } + try { + handleEvents(events); + } catch (Exception e) { + LOG.warn("Error handling event", e); + break; + } + } + } + } + if (!eventQueue.isEmpty()) { + LOG.warn("Did not finish flushing eventQueue before stopping ATSService" + + ", eventQueueBacklog=" + eventQueue.size()); + } + timelineClient.stop(); + } + + private void getEventBatch(List events) throws InterruptedException { + events.clear(); + int counter = 0; + while (counter < maxEventsPerBatch) { + DAGHistoryEvent event = eventQueue.poll(maxPollingTimeMillis, TimeUnit.MILLISECONDS); + if (event == null) { + break; + } + if (!isValidEvent(event)) { + continue; + } + ++counter; + events.add(event); + if (event.getHistoryEvent().getEventType().equals(HistoryEventType.DAG_SUBMITTED)) { + // Special case this as it might be a large payload + break; + } + } + } + + + public void handle(DAGHistoryEvent event) { + eventQueue.add(event); + } + + private boolean isValidEvent(DAGHistoryEvent event) { + HistoryEventType eventType = event.getHistoryEvent().getEventType(); + TezDAGID dagId = event.getDagID(); + + if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) { + DAGSubmittedEvent dagSubmittedEvent = + (DAGSubmittedEvent) event.getHistoryEvent(); + String dagName = dagSubmittedEvent.getDAGName(); + if (dagName != null + && dagName.startsWith( + TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) { + // Skip recording pre-warm DAG events + skippedDAGs.add(dagId); + return false; + } + if (historyACLPolicyManager != null) { + String dagDomainId = dagSubmittedEvent.getConf().get( + TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID); + if (dagDomainId != null) { + dagDomainIdMap.put(dagId, dagDomainId); + } + } + } + if (eventType.equals(HistoryEventType.DAG_FINISHED)) { + // Remove from set to keep size small + // No more events should be seen after this point. + if (skippedDAGs.remove(dagId)) { + return false; + } + } + + if (dagId != null && skippedDAGs.contains(dagId)) { + // Skip pre-warm DAGs + return false; + } + + return true; + } + + private void handleEvents(List events) { + TimelineEntity[] entities = new TimelineEntity[events.size()]; + for (int i = 0; i < events.size(); ++i) { + DAGHistoryEvent event = events.get(i); + String domainId = sessionDomainId; + TezDAGID dagId = event.getDagID(); + + if (historyACLPolicyManager != null && dagId != null) { + if (dagDomainIdMap.containsKey(dagId)) { + domainId = dagDomainIdMap.get(dagId); + } + } + + entities[i] = HistoryEventTimelineConversion.convertToTimelineEntity(event.getHistoryEvent()); + if (historyACLPolicyManager != null) { + if (domainId != null && !domainId.isEmpty()) { + historyACLPolicyManager.updateTimelineEntityDomain(entities[i], domainId); + } + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Sending event batch to Timeline, batchSize=" + events.size()); + } + try { + TimelinePutResponse response = + timelineClient.putEntities(entities); + if (response != null + && !response.getErrors().isEmpty()) { + int count = response.getErrors().size(); + for (int i = 0; i < count; ++i) { + TimelinePutError err = response.getErrors().get(i); + if (err.getErrorCode() != 0) { + LOG.warn("Could not post history event to ATS" + + ", atsPutError=" + err.getErrorCode() + + ", entityId=" + entities[i].getEntityId() + + ", eventType=" + events.get(i).getHistoryEvent().getEventType()); + } + } + } + // Do nothing additional, ATS client library should handle throttling + // or auto-disable as needed + } catch (Exception e) { + LOG.warn("Could not handle history events", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/6692c514/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java new file mode 100644 index 0000000..ca47b92 --- /dev/null +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java @@ -0,0 +1,631 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history.logging.ats; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.TreeMap; + +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.tez.common.ATSConstants; +import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.dag.api.oldrecords.TaskState; +import org.apache.tez.dag.history.HistoryEvent; +import org.apache.tez.dag.history.HistoryEventType; +import org.apache.tez.dag.history.events.AMLaunchedEvent; +import org.apache.tez.dag.history.events.AMStartedEvent; +import org.apache.tez.dag.history.events.AppLaunchedEvent; +import org.apache.tez.dag.history.events.ContainerLaunchedEvent; +import org.apache.tez.dag.history.events.ContainerStoppedEvent; +import org.apache.tez.dag.history.events.DAGFinishedEvent; +import org.apache.tez.dag.history.events.DAGInitializedEvent; +import org.apache.tez.dag.history.events.DAGStartedEvent; +import org.apache.tez.dag.history.events.DAGSubmittedEvent; +import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; +import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; +import org.apache.tez.dag.history.events.TaskFinishedEvent; +import org.apache.tez.dag.history.events.TaskStartedEvent; +import org.apache.tez.dag.history.events.VertexFinishedEvent; +import org.apache.tez.dag.history.events.VertexInitializedEvent; +import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent; +import org.apache.tez.dag.history.events.VertexStartedEvent; +import org.apache.tez.dag.history.logging.EntityTypes; +import org.apache.tez.dag.history.utils.DAGUtils; +import org.apache.tez.dag.records.TezVertexID; + +public class HistoryEventTimelineConversion { + + public static TimelineEntity convertToTimelineEntity(HistoryEvent historyEvent) { + if (!historyEvent.isHistoryEvent()) { + throw new UnsupportedOperationException("Invalid Event, does not support history" + + ", eventType=" + historyEvent.getEventType()); + } + TimelineEntity timelineEntity; + switch (historyEvent.getEventType()) { + case APP_LAUNCHED: + timelineEntity = convertAppLaunchedEvent((AppLaunchedEvent) historyEvent); + break; + case AM_LAUNCHED: + timelineEntity = convertAMLaunchedEvent((AMLaunchedEvent) historyEvent); + break; + case AM_STARTED: + timelineEntity = convertAMStartedEvent((AMStartedEvent) historyEvent); + break; + case CONTAINER_LAUNCHED: + timelineEntity = convertContainerLaunchedEvent((ContainerLaunchedEvent) historyEvent); + break; + case CONTAINER_STOPPED: + timelineEntity = convertContainerStoppedEvent((ContainerStoppedEvent) historyEvent); + break; + case DAG_SUBMITTED: + timelineEntity = convertDAGSubmittedEvent((DAGSubmittedEvent) historyEvent); + break; + case DAG_INITIALIZED: + timelineEntity = convertDAGInitializedEvent((DAGInitializedEvent) historyEvent); + break; + case DAG_STARTED: + timelineEntity = convertDAGStartedEvent((DAGStartedEvent) historyEvent); + break; + case DAG_FINISHED: + timelineEntity = convertDAGFinishedEvent((DAGFinishedEvent) historyEvent); + break; + case VERTEX_INITIALIZED: + timelineEntity = convertVertexInitializedEvent((VertexInitializedEvent) historyEvent); + break; + case VERTEX_STARTED: + timelineEntity = convertVertexStartedEvent((VertexStartedEvent) historyEvent); + break; + case VERTEX_FINISHED: + timelineEntity = convertVertexFinishedEvent((VertexFinishedEvent) historyEvent); + break; + case TASK_STARTED: + timelineEntity = convertTaskStartedEvent((TaskStartedEvent) historyEvent); + break; + case TASK_FINISHED: + timelineEntity = convertTaskFinishedEvent((TaskFinishedEvent) historyEvent); + break; + case TASK_ATTEMPT_STARTED: + timelineEntity = convertTaskAttemptStartedEvent((TaskAttemptStartedEvent) historyEvent); + break; + case TASK_ATTEMPT_FINISHED: + timelineEntity = convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent); + break; + case VERTEX_PARALLELISM_UPDATED: + timelineEntity = convertVertexParallelismUpdatedEvent( + (VertexParallelismUpdatedEvent) historyEvent); + break; + case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED: + case VERTEX_COMMIT_STARTED: + case VERTEX_GROUP_COMMIT_STARTED: + case VERTEX_GROUP_COMMIT_FINISHED: + case DAG_COMMIT_STARTED: + throw new UnsupportedOperationException("Invalid Event, does not support history" + + ", eventType=" + historyEvent.getEventType()); + default: + throw new UnsupportedOperationException("Unhandled Event" + + ", eventType=" + historyEvent.getEventType()); + } + return timelineEntity; + } + + private static TimelineEntity convertAppLaunchedEvent(AppLaunchedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId("tez_" + + event.getApplicationId().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_APPLICATION.name()); + + atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ID, + event.getApplicationId().toString()); + atsEntity.addRelatedEntity(ATSConstants.USER, event.getUser()); + + atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); + + atsEntity.addOtherInfo(ATSConstants.CONFIG, + DAGUtils.convertConfigurationToATSMap(event.getConf())); + + atsEntity.setStartTime(event.getLaunchTime()); + + if (event.getVersion() != null) { + atsEntity.addOtherInfo(ATSConstants.TEZ_VERSION, + DAGUtils.convertTezVersionToATSMap(event.getVersion())); + } + + return atsEntity; + } + + private static TimelineEntity convertAMLaunchedEvent(AMLaunchedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId("tez_" + + event.getApplicationAttemptId().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_APPLICATION_ATTEMPT.name()); + + atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ID, + event.getApplicationAttemptId().getApplicationId().toString()); + atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ATTEMPT_ID, + event.getApplicationAttemptId().toString()); + atsEntity.addRelatedEntity(ATSConstants.USER, event.getUser()); + + atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getApplicationAttemptId().getApplicationId().toString()); + + atsEntity.setStartTime(event.getLaunchTime()); + + TimelineEvent launchEvt = new TimelineEvent(); + launchEvt.setEventType(HistoryEventType.AM_LAUNCHED.name()); + launchEvt.setTimestamp(event.getLaunchTime()); + atsEntity.addEvent(launchEvt); + + atsEntity.addOtherInfo(ATSConstants.APP_SUBMIT_TIME, event.getAppSubmitTime()); + + return atsEntity; + } + + private static TimelineEntity convertAMStartedEvent(AMStartedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId("tez_" + + event.getApplicationAttemptId().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_APPLICATION_ATTEMPT.name()); + + atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getApplicationAttemptId().getApplicationId().toString()); + + TimelineEvent startEvt = new TimelineEvent(); + startEvt.setEventType(HistoryEventType.AM_STARTED.name()); + startEvt.setTimestamp(event.getStartTime()); + atsEntity.addEvent(startEvt); + + return atsEntity; + } + + private static TimelineEntity convertContainerLaunchedEvent(ContainerLaunchedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId("tez_" + + event.getContainerId().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_CONTAINER_ID.name()); + + atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), + "tez_" + event.getApplicationAttemptId().toString()); + atsEntity.addRelatedEntity(ATSConstants.CONTAINER_ID, + event.getContainerId().toString()); + + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getApplicationAttemptId().getApplicationId().toString()); + + atsEntity.setStartTime(event.getLaunchTime()); + + TimelineEvent launchEvt = new TimelineEvent(); + launchEvt.setEventType(HistoryEventType.CONTAINER_LAUNCHED.name()); + launchEvt.setTimestamp(event.getLaunchTime()); + atsEntity.addEvent(launchEvt); + + return atsEntity; + } + + private static TimelineEntity convertContainerStoppedEvent(ContainerStoppedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId("tez_" + + event.getContainerId().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_CONTAINER_ID.name()); + + // In case, a container is stopped in a different attempt + atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), + "tez_" + event.getApplicationAttemptId().toString()); + + TimelineEvent stoppedEvt = new TimelineEvent(); + stoppedEvt.setEventType(HistoryEventType.CONTAINER_STOPPED.name()); + stoppedEvt.setTimestamp(event.getStoppedTime()); + atsEntity.addEvent(stoppedEvt); + + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getApplicationAttemptId().getApplicationId().toString()); + atsEntity.addPrimaryFilter(ATSConstants.EXIT_STATUS, event.getExitStatus()); + + atsEntity.addOtherInfo(ATSConstants.EXIT_STATUS, event.getExitStatus()); + atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getStoppedTime()); + + return atsEntity; + } + + private static TimelineEntity convertDAGFinishedEvent(DAGFinishedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getDagID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name()); + + TimelineEvent finishEvt = new TimelineEvent(); + finishEvt.setEventType(HistoryEventType.DAG_FINISHED.name()); + finishEvt.setTimestamp(event.getFinishTime()); + atsEntity.addEvent(finishEvt); + + atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getDagID().getApplicationId().toString()); + atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName()); + atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name()); + + atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime()); + atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime()); + atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); + atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name()); + atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); + atsEntity.addOtherInfo(ATSConstants.COUNTERS, + DAGUtils.convertCountersToATSMap(event.getTezCounters())); + + final Map dagTaskStats = event.getDagTaskStats(); + if (dagTaskStats != null) { + for(Entry entry : dagTaskStats.entrySet()) { + atsEntity.addOtherInfo(entry.getKey(), entry.getValue()); + } + } + + return atsEntity; + } + + private static TimelineEntity convertDAGInitializedEvent(DAGInitializedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getDagID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name()); + + TimelineEvent initEvt = new TimelineEvent(); + initEvt.setEventType(HistoryEventType.DAG_INITIALIZED.name()); + initEvt.setTimestamp(event.getInitTime()); + atsEntity.addEvent(initEvt); + + atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getDagID().getApplicationId().toString()); + atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName()); + + atsEntity.addOtherInfo(ATSConstants.INIT_TIME, event.getInitTime()); + + if (event.getVertexNameIDMap() != null) { + Map nameIdStrMap = new TreeMap(); + for (Entry entry : event.getVertexNameIDMap().entrySet()) { + nameIdStrMap.put(entry.getKey(), entry.getValue().toString()); + } + atsEntity.addOtherInfo(ATSConstants.VERTEX_NAME_ID_MAPPING, nameIdStrMap); + } + + return atsEntity; + } + + private static TimelineEntity convertDAGStartedEvent(DAGStartedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getDagID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name()); + + TimelineEvent startEvt = new TimelineEvent(); + startEvt.setEventType(HistoryEventType.DAG_STARTED.name()); + startEvt.setTimestamp(event.getStartTime()); + atsEntity.addEvent(startEvt); + + atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getDagID().getApplicationId().toString()); + atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName()); + + atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime()); + atsEntity.addOtherInfo(ATSConstants.STATUS, event.getDagState().toString()); + + return atsEntity; + } + + private static TimelineEntity convertDAGSubmittedEvent(DAGSubmittedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getDagID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name()); + + atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION.name(), + "tez_" + event.getApplicationAttemptId().getApplicationId().toString()); + atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), + "tez_" + event.getApplicationAttemptId().toString()); + atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ID, + event.getApplicationAttemptId().getApplicationId().toString()); + atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ATTEMPT_ID, + event.getApplicationAttemptId().toString()); + atsEntity.addRelatedEntity(ATSConstants.USER, event.getUser()); + + TimelineEvent submitEvt = new TimelineEvent(); + submitEvt.setEventType(HistoryEventType.DAG_SUBMITTED.name()); + submitEvt.setTimestamp(event.getSubmitTime()); + atsEntity.addEvent(submitEvt); + + atsEntity.setStartTime(event.getSubmitTime()); + + atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); + atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDAGName()); + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getDagID().getApplicationId().toString()); + + try { + atsEntity.addOtherInfo(ATSConstants.DAG_PLAN, + DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan())); + } catch (IOException e) { + throw new TezUncheckedException(e); + } + atsEntity.addOtherInfo(ATSConstants.APPLICATION_ID, + event.getApplicationAttemptId().getApplicationId().toString()); + + return atsEntity; + } + + private static TimelineEntity convertTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getTaskAttemptID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_TASK_ATTEMPT_ID.name()); + + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getApplicationId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), + event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(), + event.getTaskAttemptID().getTaskID().getVertexID().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_TASK_ID.name(), + event.getTaskAttemptID().getTaskID().toString()); + + TimelineEvent finishEvt = new TimelineEvent(); + finishEvt.setEventType(HistoryEventType.TASK_ATTEMPT_FINISHED.name()); + finishEvt.setTimestamp(event.getFinishTime()); + atsEntity.addEvent(finishEvt); + + atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name()); + + atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime()); + atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); + atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name()); + if (event.getTaskAttemptError() != null) { + atsEntity.addOtherInfo(ATSConstants.TASK_ATTEMPT_ERROR_ENUM, event.getTaskAttemptError().name()); + } + atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); + atsEntity.addOtherInfo(ATSConstants.COUNTERS, + DAGUtils.convertCountersToATSMap(event.getCounters())); + + return atsEntity; + } + + private static TimelineEntity convertTaskAttemptStartedEvent(TaskAttemptStartedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getTaskAttemptID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_TASK_ATTEMPT_ID.name()); + + atsEntity.setStartTime(event.getStartTime()); + + atsEntity.addRelatedEntity(ATSConstants.NODE_ID, event.getNodeId().toString()); + atsEntity.addRelatedEntity(ATSConstants.CONTAINER_ID, event.getContainerId().toString()); + atsEntity.addRelatedEntity(EntityTypes.TEZ_TASK_ID.name(), + event.getTaskAttemptID().getTaskID().toString()); + + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getApplicationId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), + event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(), + event.getTaskAttemptID().getTaskID().getVertexID().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_TASK_ID.name(), + event.getTaskAttemptID().getTaskID().toString()); + + TimelineEvent startEvt = new TimelineEvent(); + startEvt.setEventType(HistoryEventType.TASK_ATTEMPT_STARTED.name()); + startEvt.setTimestamp(event.getStartTime()); + atsEntity.addEvent(startEvt); + + atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime()); + atsEntity.addOtherInfo(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl()); + atsEntity.addOtherInfo(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl()); + atsEntity.addOtherInfo(ATSConstants.NODE_ID, event.getNodeId().toString()); + atsEntity.addOtherInfo(ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress()); + atsEntity.addOtherInfo(ATSConstants.CONTAINER_ID, event.getContainerId().toString()); + atsEntity.addOtherInfo(ATSConstants.STATUS, TaskAttemptState.RUNNING.name()); + + return atsEntity; + } + + private static TimelineEntity convertTaskFinishedEvent(TaskFinishedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getTaskID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_TASK_ID.name()); + + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getTaskID().getVertexID().getDAGId().getApplicationId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), + event.getTaskID().getVertexID().getDAGId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(), + event.getTaskID().getVertexID().toString()); + + TimelineEvent finishEvt = new TimelineEvent(); + finishEvt.setEventType(HistoryEventType.TASK_FINISHED.name()); + finishEvt.setTimestamp(event.getFinishTime()); + atsEntity.addEvent(finishEvt); + + atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name()); + + atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime()); + atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); + atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name()); + if (event.getSuccessfulAttemptID() != null) { + atsEntity.addOtherInfo(ATSConstants.SUCCESSFUL_ATTEMPT_ID, + event.getSuccessfulAttemptID().toString()); + } + + atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); + atsEntity.addOtherInfo(ATSConstants.COUNTERS, + DAGUtils.convertCountersToATSMap(event.getTezCounters())); + + return atsEntity; + } + + private static TimelineEntity convertTaskStartedEvent(TaskStartedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getTaskID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_TASK_ID.name()); + + atsEntity.addRelatedEntity(EntityTypes.TEZ_VERTEX_ID.name(), + event.getTaskID().getVertexID().toString()); + + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getTaskID().getVertexID().getDAGId().getApplicationId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), + event.getTaskID().getVertexID().getDAGId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(), + event.getTaskID().getVertexID().toString()); + + TimelineEvent startEvt = new TimelineEvent(); + startEvt.setEventType(HistoryEventType.TASK_STARTED.name()); + startEvt.setTimestamp(event.getStartTime()); + atsEntity.addEvent(startEvt); + + atsEntity.setStartTime(event.getStartTime()); + + atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime()); + atsEntity.addOtherInfo(ATSConstants.SCHEDULED_TIME, event.getScheduledTime()); + atsEntity.addOtherInfo(ATSConstants.STATUS, TaskState.SCHEDULED.name()); + + return atsEntity; + } + + private static TimelineEntity convertVertexFinishedEvent(VertexFinishedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getVertexID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name()); + + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getVertexID().getDAGId().getApplicationId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), + event.getVertexID().getDAGId().toString()); + + TimelineEvent finishEvt = new TimelineEvent(); + finishEvt.setEventType(HistoryEventType.VERTEX_FINISHED.name()); + finishEvt.setTimestamp(event.getFinishTime()); + atsEntity.addEvent(finishEvt); + + atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name()); + + atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime()); + atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); + atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name()); + + atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); + atsEntity.addOtherInfo(ATSConstants.COUNTERS, + DAGUtils.convertCountersToATSMap(event.getTezCounters())); + atsEntity.addOtherInfo(ATSConstants.STATS, + DAGUtils.convertVertexStatsToATSMap(event.getVertexStats())); + + final Map vertexTaskStats = event.getVertexTaskStats(); + if (vertexTaskStats != null) { + for(Entry entry : vertexTaskStats.entrySet()) { + atsEntity.addOtherInfo(entry.getKey(), entry.getValue()); + } + } + + return atsEntity; + } + + private static TimelineEntity convertVertexInitializedEvent(VertexInitializedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getVertexID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name()); + + atsEntity.addRelatedEntity(EntityTypes.TEZ_DAG_ID.name(), + event.getVertexID().getDAGId().toString()); + + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getVertexID().getDAGId().getApplicationId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), + event.getVertexID().getDAGId().toString()); + + TimelineEvent initEvt = new TimelineEvent(); + initEvt.setEventType(HistoryEventType.VERTEX_INITIALIZED.name()); + initEvt.setTimestamp(event.getInitedTime()); + atsEntity.addEvent(initEvt); + + atsEntity.setStartTime(event.getInitedTime()); + + atsEntity.addOtherInfo(ATSConstants.VERTEX_NAME, event.getVertexName()); + atsEntity.addOtherInfo(ATSConstants.INIT_REQUESTED_TIME, event.getInitRequestedTime()); + atsEntity.addOtherInfo(ATSConstants.INIT_TIME, event.getInitedTime()); + atsEntity.addOtherInfo(ATSConstants.NUM_TASKS, event.getNumTasks()); + atsEntity.addOtherInfo(ATSConstants.PROCESSOR_CLASS_NAME, event.getProcessorName()); + + return atsEntity; + } + + private static TimelineEntity convertVertexStartedEvent(VertexStartedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getVertexID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name()); + + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getVertexID().getDAGId().getApplicationId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), + event.getVertexID().getDAGId().toString()); + + TimelineEvent startEvt = new TimelineEvent(); + startEvt.setEventType(HistoryEventType.VERTEX_STARTED.name()); + startEvt.setTimestamp(event.getStartTime()); + atsEntity.addEvent(startEvt); + + atsEntity.addOtherInfo(ATSConstants.START_REQUESTED_TIME, event.getStartRequestedTime()); + atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime()); + atsEntity.addOtherInfo(ATSConstants.STATUS, event.getVertexState().toString()); + + return atsEntity; + } + + private static TimelineEntity convertVertexParallelismUpdatedEvent( + VertexParallelismUpdatedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getVertexID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name()); + + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getVertexID().getDAGId().getApplicationId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), + event.getVertexID().getDAGId().toString()); + + TimelineEvent updateEvt = new TimelineEvent(); + updateEvt.setEventType(HistoryEventType.VERTEX_PARALLELISM_UPDATED.name()); + updateEvt.setTimestamp(event.getUpdateTime()); + + Map eventInfo = new HashMap(); + if (event.getSourceEdgeManagers() != null && !event.getSourceEdgeManagers().isEmpty()) { + Map updatedEdgeManagers = new HashMap(); + for (Entry entry : + event.getSourceEdgeManagers().entrySet()) { + updatedEdgeManagers.put(entry.getKey(), + DAGUtils.convertEdgeManagerPluginDescriptor(entry.getValue())); + } + eventInfo.put(ATSConstants.UPDATED_EDGE_MANAGERS, updatedEdgeManagers); + } + eventInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks()); + eventInfo.put(ATSConstants.OLD_NUM_TASKS, event.getOldNumTasks()); + updateEvt.setEventInfo(eventInfo); + atsEntity.addEvent(updateEvt); + + atsEntity.addOtherInfo(ATSConstants.NUM_TASKS, event.getNumTasks()); + + return atsEntity; + } + +}