Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1E43117BC1 for ; Fri, 27 Mar 2015 00:57:26 +0000 (UTC) Received: (qmail 15393 invoked by uid 500); 27 Mar 2015 00:57:19 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 15320 invoked by uid 500); 27 Mar 2015 00:57:19 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 15120 invoked by uid 99); 27 Mar 2015 00:57:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 27 Mar 2015 00:57:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 68B45E2F3A; Fri, 27 Mar 2015 00:57:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Fri, 27 Mar 2015 00:57:23 -0000 Message-Id: <4a1098d9037c40408200b9e59da146e0@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [05/52] [abbrv] tez git commit: TEZ-2168. Fix application dependencies on mutually exclusive artifacts: tez-yarn-timeline-history and tez-yarn-timeline-history-with-acls. (hitesh) TEZ-2168. Fix application dependencies on mutually exclusive artifacts: tez-yarn-timeline-history and tez-yarn-timeline-history-with-acls. (hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6692c514 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6692c514 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6692c514 Branch: refs/heads/TEZ-2003 Commit: 6692c514efa2b8bdc1f19bead9dedc3d1bb3d88b Parents: 35a2f3c Author: Hitesh Shah Authored: Tue Mar 10 21:37:39 2015 -0700 Committer: Hitesh Shah Committed: Tue Mar 10 21:37:39 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../logging/ats/ATSHistoryLoggingService.java | 325 ------- .../ats/HistoryEventTimelineConversion.java | 631 -------------- .../ats/TestATSHistoryLoggingService.java | 145 ---- .../ats/TestATSHistoryWithMiniCluster.java | 242 ------ .../ats/TestHistoryEventTimelineConversion.java | 850 ------------------- .../tez/tests/MiniTezClusterWithTimeline.java | 253 ------ .../java/org/apache/tez/dag/history/logging/ats | 1 - .../logging/ats/ATSHistoryLoggingService.java | 325 +++++++ .../ats/HistoryEventTimelineConversion.java | 631 ++++++++++++++ .../java/org/apache/tez/dag/history/logging/ats | 1 - .../ats/TestATSHistoryLoggingService.java | 145 ++++ .../ats/TestATSHistoryWithMiniCluster.java | 242 ++++++ .../ats/TestHistoryEventTimelineConversion.java | 850 +++++++++++++++++++ .../src/test/java/org/apache/tez/tests | 1 - .../tez/tests/MiniTezClusterWithTimeline.java | 253 ++++++ 16 files changed, 2448 insertions(+), 2449 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/6692c514/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 67cc178..44a13b7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -80,6 +80,8 @@ Release 0.6.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2168. Fix application dependencies on mutually exclusive artifacts: tez-yarn-timeline-history + and tez-yarn-timeline-history-with-acls. TEZ-2190. TestOrderedWordCount fails when generateSplitsInClient set to true. TEZ-2091. Add support for hosting TEZ_UI with nodejs. TEZ-2165. Tez UI: DAG shows running status if killed by RM in some cases. http://git-wip-us.apache.org/repos/asf/tez/blob/6692c514/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java deleted file mode 100644 index c68d395..0000000 --- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java +++ /dev/null @@ -1,325 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.history.logging.ats; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; -import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; -import org.apache.hadoop.yarn.client.api.TimelineClient; -import org.apache.tez.common.ReflectionUtils; -import org.apache.tez.common.security.HistoryACLPolicyManager; -import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.TezConstants; -import org.apache.tez.dag.api.TezUncheckedException; -import org.apache.tez.dag.history.DAGHistoryEvent; -import org.apache.tez.dag.history.HistoryEventType; -import org.apache.tez.dag.history.events.DAGSubmittedEvent; -import org.apache.tez.dag.history.logging.HistoryLoggingService; -import org.apache.tez.dag.records.TezDAGID; - -import com.google.common.annotations.VisibleForTesting; - -public class ATSHistoryLoggingService extends HistoryLoggingService { - - private static final Log LOG = LogFactory.getLog(ATSHistoryLoggingService.class); - - private LinkedBlockingQueue eventQueue = - new LinkedBlockingQueue(); - - private Thread eventHandlingThread; - private AtomicBoolean stopped = new AtomicBoolean(false); - private int eventCounter = 0; - private int eventsProcessed = 0; - private final Object lock = new Object(); - - @VisibleForTesting - TimelineClient timelineClient; - - private HashSet skippedDAGs = new HashSet(); - private Map dagDomainIdMap = new HashMap(); - private long maxTimeToWaitOnShutdown; - private boolean waitForeverOnShutdown = false; - - private int maxEventsPerBatch; - private long maxPollingTimeMillis; - - private String sessionDomainId; - private static final String atsHistoryACLManagerClassName = - "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager"; - private HistoryACLPolicyManager historyACLPolicyManager; - - public ATSHistoryLoggingService() { - super(ATSHistoryLoggingService.class.getName()); - } - - @Override - public void serviceInit(Configuration conf) throws Exception { - LOG.info("Initializing ATSService"); - timelineClient = TimelineClient.createTimelineClient(); - timelineClient.init(conf); - maxTimeToWaitOnShutdown = conf.getLong( - TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS, - TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS_DEFAULT); - maxEventsPerBatch = conf.getInt( - TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH, - TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH_DEFAULT); - maxPollingTimeMillis = conf.getInt( - TezConfiguration.YARN_ATS_MAX_POLLING_TIME_PER_EVENT, - TezConfiguration.YARN_ATS_MAX_POLLING_TIME_PER_EVENT_DEFAULT); - if (maxTimeToWaitOnShutdown < 0) { - waitForeverOnShutdown = true; - } - sessionDomainId = conf.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID); - - LOG.info("Using " + atsHistoryACLManagerClassName + " to manage Timeline ACLs"); - try { - historyACLPolicyManager = ReflectionUtils.createClazzInstance( - atsHistoryACLManagerClassName); - historyACLPolicyManager.setConf(conf); - } catch (TezUncheckedException e) { - LOG.warn("Could not instantiate object for " + atsHistoryACLManagerClassName - + ". ACLs cannot be enforced correctly for history data in Timeline", e); - if (!conf.getBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, - TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS_DEFAULT)) { - throw e; - } - historyACLPolicyManager = null; - } - - } - - @Override - public void serviceStart() { - LOG.info("Starting ATSService"); - timelineClient.start(); - - eventHandlingThread = new Thread(new Runnable() { - @Override - public void run() { - List events = new LinkedList(); - boolean interrupted = false; - while (!stopped.get() && !Thread.currentThread().isInterrupted() - && !interrupted) { - - // Log the size of the event-queue every so often. - if (eventCounter != 0 && eventCounter % 1000 == 0) { - if (eventsProcessed != 0 && !events.isEmpty()) { - LOG.info("Event queue stats" - + ", eventsProcessedSinceLastUpdate=" + eventsProcessed - + ", eventQueueSize=" + eventQueue.size()); - } - eventCounter = 0; - eventsProcessed = 0; - } else { - ++eventCounter; - } - - synchronized (lock) { - try { - getEventBatch(events); - } catch (InterruptedException e) { - // Finish processing events and then return - interrupted = true; - } - - if (events.isEmpty()) { - continue; - } - - eventsProcessed += events.size(); - try { - handleEvents(events); - } catch (Exception e) { - LOG.warn("Error handling events", e); - } - } - } - } - }, "HistoryEventHandlingThread"); - eventHandlingThread.start(); - } - - @Override - public void serviceStop() { - LOG.info("Stopping ATSService" - + ", eventQueueBacklog=" + eventQueue.size()); - stopped.set(true); - if (eventHandlingThread != null) { - eventHandlingThread.interrupt(); - } - synchronized (lock) { - if (!eventQueue.isEmpty()) { - LOG.warn("ATSService being stopped" - + ", eventQueueBacklog=" + eventQueue.size() - + ", maxTimeLeftToFlush=" + maxTimeToWaitOnShutdown - + ", waitForever=" + waitForeverOnShutdown); - long startTime = appContext.getClock().getTime(); - long endTime = startTime + maxTimeToWaitOnShutdown; - List events = new LinkedList(); - while (waitForeverOnShutdown || (endTime >= appContext.getClock().getTime())) { - try { - getEventBatch(events); - } catch (InterruptedException e) { - LOG.info("ATSService interrupted while shutting down. Exiting." - + " EventQueueBacklog=" + eventQueue.size()); - } - if (events.isEmpty()) { - LOG.info("Event queue empty, stopping ATS Service"); - break; - } - try { - handleEvents(events); - } catch (Exception e) { - LOG.warn("Error handling event", e); - break; - } - } - } - } - if (!eventQueue.isEmpty()) { - LOG.warn("Did not finish flushing eventQueue before stopping ATSService" - + ", eventQueueBacklog=" + eventQueue.size()); - } - timelineClient.stop(); - } - - private void getEventBatch(List events) throws InterruptedException { - events.clear(); - int counter = 0; - while (counter < maxEventsPerBatch) { - DAGHistoryEvent event = eventQueue.poll(maxPollingTimeMillis, TimeUnit.MILLISECONDS); - if (event == null) { - break; - } - if (!isValidEvent(event)) { - continue; - } - ++counter; - events.add(event); - if (event.getHistoryEvent().getEventType().equals(HistoryEventType.DAG_SUBMITTED)) { - // Special case this as it might be a large payload - break; - } - } - } - - - public void handle(DAGHistoryEvent event) { - eventQueue.add(event); - } - - private boolean isValidEvent(DAGHistoryEvent event) { - HistoryEventType eventType = event.getHistoryEvent().getEventType(); - TezDAGID dagId = event.getDagID(); - - if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) { - DAGSubmittedEvent dagSubmittedEvent = - (DAGSubmittedEvent) event.getHistoryEvent(); - String dagName = dagSubmittedEvent.getDAGName(); - if (dagName != null - && dagName.startsWith( - TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) { - // Skip recording pre-warm DAG events - skippedDAGs.add(dagId); - return false; - } - if (historyACLPolicyManager != null) { - String dagDomainId = dagSubmittedEvent.getConf().get( - TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID); - if (dagDomainId != null) { - dagDomainIdMap.put(dagId, dagDomainId); - } - } - } - if (eventType.equals(HistoryEventType.DAG_FINISHED)) { - // Remove from set to keep size small - // No more events should be seen after this point. - if (skippedDAGs.remove(dagId)) { - return false; - } - } - - if (dagId != null && skippedDAGs.contains(dagId)) { - // Skip pre-warm DAGs - return false; - } - - return true; - } - - private void handleEvents(List events) { - TimelineEntity[] entities = new TimelineEntity[events.size()]; - for (int i = 0; i < events.size(); ++i) { - DAGHistoryEvent event = events.get(i); - String domainId = sessionDomainId; - TezDAGID dagId = event.getDagID(); - - if (historyACLPolicyManager != null && dagId != null) { - if (dagDomainIdMap.containsKey(dagId)) { - domainId = dagDomainIdMap.get(dagId); - } - } - - entities[i] = HistoryEventTimelineConversion.convertToTimelineEntity(event.getHistoryEvent()); - if (historyACLPolicyManager != null) { - if (domainId != null && !domainId.isEmpty()) { - historyACLPolicyManager.updateTimelineEntityDomain(entities[i], domainId); - } - } - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Sending event batch to Timeline, batchSize=" + events.size()); - } - try { - TimelinePutResponse response = - timelineClient.putEntities(entities); - if (response != null - && !response.getErrors().isEmpty()) { - int count = response.getErrors().size(); - for (int i = 0; i < count; ++i) { - TimelinePutError err = response.getErrors().get(i); - if (err.getErrorCode() != 0) { - LOG.warn("Could not post history event to ATS" - + ", atsPutError=" + err.getErrorCode() - + ", entityId=" + entities[i].getEntityId() - + ", eventType=" + events.get(i).getHistoryEvent().getEventType()); - } - } - } - // Do nothing additional, ATS client library should handle throttling - // or auto-disable as needed - } catch (Exception e) { - LOG.warn("Could not handle history events", e); - } - } - -} http://git-wip-us.apache.org/repos/asf/tez/blob/6692c514/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java deleted file mode 100644 index ca47b92..0000000 --- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java +++ /dev/null @@ -1,631 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.history.logging.ats; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.TreeMap; - -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; -import org.apache.tez.common.ATSConstants; -import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; -import org.apache.tez.dag.api.TezUncheckedException; -import org.apache.tez.dag.api.oldrecords.TaskAttemptState; -import org.apache.tez.dag.api.oldrecords.TaskState; -import org.apache.tez.dag.history.HistoryEvent; -import org.apache.tez.dag.history.HistoryEventType; -import org.apache.tez.dag.history.events.AMLaunchedEvent; -import org.apache.tez.dag.history.events.AMStartedEvent; -import org.apache.tez.dag.history.events.AppLaunchedEvent; -import org.apache.tez.dag.history.events.ContainerLaunchedEvent; -import org.apache.tez.dag.history.events.ContainerStoppedEvent; -import org.apache.tez.dag.history.events.DAGFinishedEvent; -import org.apache.tez.dag.history.events.DAGInitializedEvent; -import org.apache.tez.dag.history.events.DAGStartedEvent; -import org.apache.tez.dag.history.events.DAGSubmittedEvent; -import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; -import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; -import org.apache.tez.dag.history.events.TaskFinishedEvent; -import org.apache.tez.dag.history.events.TaskStartedEvent; -import org.apache.tez.dag.history.events.VertexFinishedEvent; -import org.apache.tez.dag.history.events.VertexInitializedEvent; -import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent; -import org.apache.tez.dag.history.events.VertexStartedEvent; -import org.apache.tez.dag.history.logging.EntityTypes; -import org.apache.tez.dag.history.utils.DAGUtils; -import org.apache.tez.dag.records.TezVertexID; - -public class HistoryEventTimelineConversion { - - public static TimelineEntity convertToTimelineEntity(HistoryEvent historyEvent) { - if (!historyEvent.isHistoryEvent()) { - throw new UnsupportedOperationException("Invalid Event, does not support history" - + ", eventType=" + historyEvent.getEventType()); - } - TimelineEntity timelineEntity; - switch (historyEvent.getEventType()) { - case APP_LAUNCHED: - timelineEntity = convertAppLaunchedEvent((AppLaunchedEvent) historyEvent); - break; - case AM_LAUNCHED: - timelineEntity = convertAMLaunchedEvent((AMLaunchedEvent) historyEvent); - break; - case AM_STARTED: - timelineEntity = convertAMStartedEvent((AMStartedEvent) historyEvent); - break; - case CONTAINER_LAUNCHED: - timelineEntity = convertContainerLaunchedEvent((ContainerLaunchedEvent) historyEvent); - break; - case CONTAINER_STOPPED: - timelineEntity = convertContainerStoppedEvent((ContainerStoppedEvent) historyEvent); - break; - case DAG_SUBMITTED: - timelineEntity = convertDAGSubmittedEvent((DAGSubmittedEvent) historyEvent); - break; - case DAG_INITIALIZED: - timelineEntity = convertDAGInitializedEvent((DAGInitializedEvent) historyEvent); - break; - case DAG_STARTED: - timelineEntity = convertDAGStartedEvent((DAGStartedEvent) historyEvent); - break; - case DAG_FINISHED: - timelineEntity = convertDAGFinishedEvent((DAGFinishedEvent) historyEvent); - break; - case VERTEX_INITIALIZED: - timelineEntity = convertVertexInitializedEvent((VertexInitializedEvent) historyEvent); - break; - case VERTEX_STARTED: - timelineEntity = convertVertexStartedEvent((VertexStartedEvent) historyEvent); - break; - case VERTEX_FINISHED: - timelineEntity = convertVertexFinishedEvent((VertexFinishedEvent) historyEvent); - break; - case TASK_STARTED: - timelineEntity = convertTaskStartedEvent((TaskStartedEvent) historyEvent); - break; - case TASK_FINISHED: - timelineEntity = convertTaskFinishedEvent((TaskFinishedEvent) historyEvent); - break; - case TASK_ATTEMPT_STARTED: - timelineEntity = convertTaskAttemptStartedEvent((TaskAttemptStartedEvent) historyEvent); - break; - case TASK_ATTEMPT_FINISHED: - timelineEntity = convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent); - break; - case VERTEX_PARALLELISM_UPDATED: - timelineEntity = convertVertexParallelismUpdatedEvent( - (VertexParallelismUpdatedEvent) historyEvent); - break; - case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED: - case VERTEX_COMMIT_STARTED: - case VERTEX_GROUP_COMMIT_STARTED: - case VERTEX_GROUP_COMMIT_FINISHED: - case DAG_COMMIT_STARTED: - throw new UnsupportedOperationException("Invalid Event, does not support history" - + ", eventType=" + historyEvent.getEventType()); - default: - throw new UnsupportedOperationException("Unhandled Event" - + ", eventType=" + historyEvent.getEventType()); - } - return timelineEntity; - } - - private static TimelineEntity convertAppLaunchedEvent(AppLaunchedEvent event) { - TimelineEntity atsEntity = new TimelineEntity(); - atsEntity.setEntityId("tez_" - + event.getApplicationId().toString()); - atsEntity.setEntityType(EntityTypes.TEZ_APPLICATION.name()); - - atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ID, - event.getApplicationId().toString()); - atsEntity.addRelatedEntity(ATSConstants.USER, event.getUser()); - - atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); - - atsEntity.addOtherInfo(ATSConstants.CONFIG, - DAGUtils.convertConfigurationToATSMap(event.getConf())); - - atsEntity.setStartTime(event.getLaunchTime()); - - if (event.getVersion() != null) { - atsEntity.addOtherInfo(ATSConstants.TEZ_VERSION, - DAGUtils.convertTezVersionToATSMap(event.getVersion())); - } - - return atsEntity; - } - - private static TimelineEntity convertAMLaunchedEvent(AMLaunchedEvent event) { - TimelineEntity atsEntity = new TimelineEntity(); - atsEntity.setEntityId("tez_" - + event.getApplicationAttemptId().toString()); - atsEntity.setEntityType(EntityTypes.TEZ_APPLICATION_ATTEMPT.name()); - - atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ID, - event.getApplicationAttemptId().getApplicationId().toString()); - atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ATTEMPT_ID, - event.getApplicationAttemptId().toString()); - atsEntity.addRelatedEntity(ATSConstants.USER, event.getUser()); - - atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); - atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, - event.getApplicationAttemptId().getApplicationId().toString()); - - atsEntity.setStartTime(event.getLaunchTime()); - - TimelineEvent launchEvt = new TimelineEvent(); - launchEvt.setEventType(HistoryEventType.AM_LAUNCHED.name()); - launchEvt.setTimestamp(event.getLaunchTime()); - atsEntity.addEvent(launchEvt); - - atsEntity.addOtherInfo(ATSConstants.APP_SUBMIT_TIME, event.getAppSubmitTime()); - - return atsEntity; - } - - private static TimelineEntity convertAMStartedEvent(AMStartedEvent event) { - TimelineEntity atsEntity = new TimelineEntity(); - atsEntity.setEntityId("tez_" - + event.getApplicationAttemptId().toString()); - atsEntity.setEntityType(EntityTypes.TEZ_APPLICATION_ATTEMPT.name()); - - atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); - atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, - event.getApplicationAttemptId().getApplicationId().toString()); - - TimelineEvent startEvt = new TimelineEvent(); - startEvt.setEventType(HistoryEventType.AM_STARTED.name()); - startEvt.setTimestamp(event.getStartTime()); - atsEntity.addEvent(startEvt); - - return atsEntity; - } - - private static TimelineEntity convertContainerLaunchedEvent(ContainerLaunchedEvent event) { - TimelineEntity atsEntity = new TimelineEntity(); - atsEntity.setEntityId("tez_" - + event.getContainerId().toString()); - atsEntity.setEntityType(EntityTypes.TEZ_CONTAINER_ID.name()); - - atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), - "tez_" + event.getApplicationAttemptId().toString()); - atsEntity.addRelatedEntity(ATSConstants.CONTAINER_ID, - event.getContainerId().toString()); - - atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, - event.getApplicationAttemptId().getApplicationId().toString()); - - atsEntity.setStartTime(event.getLaunchTime()); - - TimelineEvent launchEvt = new TimelineEvent(); - launchEvt.setEventType(HistoryEventType.CONTAINER_LAUNCHED.name()); - launchEvt.setTimestamp(event.getLaunchTime()); - atsEntity.addEvent(launchEvt); - - return atsEntity; - } - - private static TimelineEntity convertContainerStoppedEvent(ContainerStoppedEvent event) { - TimelineEntity atsEntity = new TimelineEntity(); - atsEntity.setEntityId("tez_" - + event.getContainerId().toString()); - atsEntity.setEntityType(EntityTypes.TEZ_CONTAINER_ID.name()); - - // In case, a container is stopped in a different attempt - atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), - "tez_" + event.getApplicationAttemptId().toString()); - - TimelineEvent stoppedEvt = new TimelineEvent(); - stoppedEvt.setEventType(HistoryEventType.CONTAINER_STOPPED.name()); - stoppedEvt.setTimestamp(event.getStoppedTime()); - atsEntity.addEvent(stoppedEvt); - - atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, - event.getApplicationAttemptId().getApplicationId().toString()); - atsEntity.addPrimaryFilter(ATSConstants.EXIT_STATUS, event.getExitStatus()); - - atsEntity.addOtherInfo(ATSConstants.EXIT_STATUS, event.getExitStatus()); - atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getStoppedTime()); - - return atsEntity; - } - - private static TimelineEntity convertDAGFinishedEvent(DAGFinishedEvent event) { - TimelineEntity atsEntity = new TimelineEntity(); - atsEntity.setEntityId(event.getDagID().toString()); - atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name()); - - TimelineEvent finishEvt = new TimelineEvent(); - finishEvt.setEventType(HistoryEventType.DAG_FINISHED.name()); - finishEvt.setTimestamp(event.getFinishTime()); - atsEntity.addEvent(finishEvt); - - atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); - atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, - event.getDagID().getApplicationId().toString()); - atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName()); - atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name()); - - atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime()); - atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime()); - atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); - atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name()); - atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); - atsEntity.addOtherInfo(ATSConstants.COUNTERS, - DAGUtils.convertCountersToATSMap(event.getTezCounters())); - - final Map dagTaskStats = event.getDagTaskStats(); - if (dagTaskStats != null) { - for(Entry entry : dagTaskStats.entrySet()) { - atsEntity.addOtherInfo(entry.getKey(), entry.getValue()); - } - } - - return atsEntity; - } - - private static TimelineEntity convertDAGInitializedEvent(DAGInitializedEvent event) { - TimelineEntity atsEntity = new TimelineEntity(); - atsEntity.setEntityId(event.getDagID().toString()); - atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name()); - - TimelineEvent initEvt = new TimelineEvent(); - initEvt.setEventType(HistoryEventType.DAG_INITIALIZED.name()); - initEvt.setTimestamp(event.getInitTime()); - atsEntity.addEvent(initEvt); - - atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); - atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, - event.getDagID().getApplicationId().toString()); - atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName()); - - atsEntity.addOtherInfo(ATSConstants.INIT_TIME, event.getInitTime()); - - if (event.getVertexNameIDMap() != null) { - Map nameIdStrMap = new TreeMap(); - for (Entry entry : event.getVertexNameIDMap().entrySet()) { - nameIdStrMap.put(entry.getKey(), entry.getValue().toString()); - } - atsEntity.addOtherInfo(ATSConstants.VERTEX_NAME_ID_MAPPING, nameIdStrMap); - } - - return atsEntity; - } - - private static TimelineEntity convertDAGStartedEvent(DAGStartedEvent event) { - TimelineEntity atsEntity = new TimelineEntity(); - atsEntity.setEntityId(event.getDagID().toString()); - atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name()); - - TimelineEvent startEvt = new TimelineEvent(); - startEvt.setEventType(HistoryEventType.DAG_STARTED.name()); - startEvt.setTimestamp(event.getStartTime()); - atsEntity.addEvent(startEvt); - - atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); - atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, - event.getDagID().getApplicationId().toString()); - atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName()); - - atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime()); - atsEntity.addOtherInfo(ATSConstants.STATUS, event.getDagState().toString()); - - return atsEntity; - } - - private static TimelineEntity convertDAGSubmittedEvent(DAGSubmittedEvent event) { - TimelineEntity atsEntity = new TimelineEntity(); - atsEntity.setEntityId(event.getDagID().toString()); - atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name()); - - atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION.name(), - "tez_" + event.getApplicationAttemptId().getApplicationId().toString()); - atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), - "tez_" + event.getApplicationAttemptId().toString()); - atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ID, - event.getApplicationAttemptId().getApplicationId().toString()); - atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ATTEMPT_ID, - event.getApplicationAttemptId().toString()); - atsEntity.addRelatedEntity(ATSConstants.USER, event.getUser()); - - TimelineEvent submitEvt = new TimelineEvent(); - submitEvt.setEventType(HistoryEventType.DAG_SUBMITTED.name()); - submitEvt.setTimestamp(event.getSubmitTime()); - atsEntity.addEvent(submitEvt); - - atsEntity.setStartTime(event.getSubmitTime()); - - atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); - atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDAGName()); - atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, - event.getDagID().getApplicationId().toString()); - - try { - atsEntity.addOtherInfo(ATSConstants.DAG_PLAN, - DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan())); - } catch (IOException e) { - throw new TezUncheckedException(e); - } - atsEntity.addOtherInfo(ATSConstants.APPLICATION_ID, - event.getApplicationAttemptId().getApplicationId().toString()); - - return atsEntity; - } - - private static TimelineEntity convertTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) { - TimelineEntity atsEntity = new TimelineEntity(); - atsEntity.setEntityId(event.getTaskAttemptID().toString()); - atsEntity.setEntityType(EntityTypes.TEZ_TASK_ATTEMPT_ID.name()); - - atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, - event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getApplicationId().toString()); - atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), - event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().toString()); - atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(), - event.getTaskAttemptID().getTaskID().getVertexID().toString()); - atsEntity.addPrimaryFilter(EntityTypes.TEZ_TASK_ID.name(), - event.getTaskAttemptID().getTaskID().toString()); - - TimelineEvent finishEvt = new TimelineEvent(); - finishEvt.setEventType(HistoryEventType.TASK_ATTEMPT_FINISHED.name()); - finishEvt.setTimestamp(event.getFinishTime()); - atsEntity.addEvent(finishEvt); - - atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name()); - - atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime()); - atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); - atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name()); - if (event.getTaskAttemptError() != null) { - atsEntity.addOtherInfo(ATSConstants.TASK_ATTEMPT_ERROR_ENUM, event.getTaskAttemptError().name()); - } - atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); - atsEntity.addOtherInfo(ATSConstants.COUNTERS, - DAGUtils.convertCountersToATSMap(event.getCounters())); - - return atsEntity; - } - - private static TimelineEntity convertTaskAttemptStartedEvent(TaskAttemptStartedEvent event) { - TimelineEntity atsEntity = new TimelineEntity(); - atsEntity.setEntityId(event.getTaskAttemptID().toString()); - atsEntity.setEntityType(EntityTypes.TEZ_TASK_ATTEMPT_ID.name()); - - atsEntity.setStartTime(event.getStartTime()); - - atsEntity.addRelatedEntity(ATSConstants.NODE_ID, event.getNodeId().toString()); - atsEntity.addRelatedEntity(ATSConstants.CONTAINER_ID, event.getContainerId().toString()); - atsEntity.addRelatedEntity(EntityTypes.TEZ_TASK_ID.name(), - event.getTaskAttemptID().getTaskID().toString()); - - atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, - event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getApplicationId().toString()); - atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), - event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().toString()); - atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(), - event.getTaskAttemptID().getTaskID().getVertexID().toString()); - atsEntity.addPrimaryFilter(EntityTypes.TEZ_TASK_ID.name(), - event.getTaskAttemptID().getTaskID().toString()); - - TimelineEvent startEvt = new TimelineEvent(); - startEvt.setEventType(HistoryEventType.TASK_ATTEMPT_STARTED.name()); - startEvt.setTimestamp(event.getStartTime()); - atsEntity.addEvent(startEvt); - - atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime()); - atsEntity.addOtherInfo(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl()); - atsEntity.addOtherInfo(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl()); - atsEntity.addOtherInfo(ATSConstants.NODE_ID, event.getNodeId().toString()); - atsEntity.addOtherInfo(ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress()); - atsEntity.addOtherInfo(ATSConstants.CONTAINER_ID, event.getContainerId().toString()); - atsEntity.addOtherInfo(ATSConstants.STATUS, TaskAttemptState.RUNNING.name()); - - return atsEntity; - } - - private static TimelineEntity convertTaskFinishedEvent(TaskFinishedEvent event) { - TimelineEntity atsEntity = new TimelineEntity(); - atsEntity.setEntityId(event.getTaskID().toString()); - atsEntity.setEntityType(EntityTypes.TEZ_TASK_ID.name()); - - atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, - event.getTaskID().getVertexID().getDAGId().getApplicationId().toString()); - atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), - event.getTaskID().getVertexID().getDAGId().toString()); - atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(), - event.getTaskID().getVertexID().toString()); - - TimelineEvent finishEvt = new TimelineEvent(); - finishEvt.setEventType(HistoryEventType.TASK_FINISHED.name()); - finishEvt.setTimestamp(event.getFinishTime()); - atsEntity.addEvent(finishEvt); - - atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name()); - - atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime()); - atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); - atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name()); - if (event.getSuccessfulAttemptID() != null) { - atsEntity.addOtherInfo(ATSConstants.SUCCESSFUL_ATTEMPT_ID, - event.getSuccessfulAttemptID().toString()); - } - - atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); - atsEntity.addOtherInfo(ATSConstants.COUNTERS, - DAGUtils.convertCountersToATSMap(event.getTezCounters())); - - return atsEntity; - } - - private static TimelineEntity convertTaskStartedEvent(TaskStartedEvent event) { - TimelineEntity atsEntity = new TimelineEntity(); - atsEntity.setEntityId(event.getTaskID().toString()); - atsEntity.setEntityType(EntityTypes.TEZ_TASK_ID.name()); - - atsEntity.addRelatedEntity(EntityTypes.TEZ_VERTEX_ID.name(), - event.getTaskID().getVertexID().toString()); - - atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, - event.getTaskID().getVertexID().getDAGId().getApplicationId().toString()); - atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), - event.getTaskID().getVertexID().getDAGId().toString()); - atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(), - event.getTaskID().getVertexID().toString()); - - TimelineEvent startEvt = new TimelineEvent(); - startEvt.setEventType(HistoryEventType.TASK_STARTED.name()); - startEvt.setTimestamp(event.getStartTime()); - atsEntity.addEvent(startEvt); - - atsEntity.setStartTime(event.getStartTime()); - - atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime()); - atsEntity.addOtherInfo(ATSConstants.SCHEDULED_TIME, event.getScheduledTime()); - atsEntity.addOtherInfo(ATSConstants.STATUS, TaskState.SCHEDULED.name()); - - return atsEntity; - } - - private static TimelineEntity convertVertexFinishedEvent(VertexFinishedEvent event) { - TimelineEntity atsEntity = new TimelineEntity(); - atsEntity.setEntityId(event.getVertexID().toString()); - atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name()); - - atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, - event.getVertexID().getDAGId().getApplicationId().toString()); - atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), - event.getVertexID().getDAGId().toString()); - - TimelineEvent finishEvt = new TimelineEvent(); - finishEvt.setEventType(HistoryEventType.VERTEX_FINISHED.name()); - finishEvt.setTimestamp(event.getFinishTime()); - atsEntity.addEvent(finishEvt); - - atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name()); - - atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime()); - atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); - atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name()); - - atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); - atsEntity.addOtherInfo(ATSConstants.COUNTERS, - DAGUtils.convertCountersToATSMap(event.getTezCounters())); - atsEntity.addOtherInfo(ATSConstants.STATS, - DAGUtils.convertVertexStatsToATSMap(event.getVertexStats())); - - final Map vertexTaskStats = event.getVertexTaskStats(); - if (vertexTaskStats != null) { - for(Entry entry : vertexTaskStats.entrySet()) { - atsEntity.addOtherInfo(entry.getKey(), entry.getValue()); - } - } - - return atsEntity; - } - - private static TimelineEntity convertVertexInitializedEvent(VertexInitializedEvent event) { - TimelineEntity atsEntity = new TimelineEntity(); - atsEntity.setEntityId(event.getVertexID().toString()); - atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name()); - - atsEntity.addRelatedEntity(EntityTypes.TEZ_DAG_ID.name(), - event.getVertexID().getDAGId().toString()); - - atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, - event.getVertexID().getDAGId().getApplicationId().toString()); - atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), - event.getVertexID().getDAGId().toString()); - - TimelineEvent initEvt = new TimelineEvent(); - initEvt.setEventType(HistoryEventType.VERTEX_INITIALIZED.name()); - initEvt.setTimestamp(event.getInitedTime()); - atsEntity.addEvent(initEvt); - - atsEntity.setStartTime(event.getInitedTime()); - - atsEntity.addOtherInfo(ATSConstants.VERTEX_NAME, event.getVertexName()); - atsEntity.addOtherInfo(ATSConstants.INIT_REQUESTED_TIME, event.getInitRequestedTime()); - atsEntity.addOtherInfo(ATSConstants.INIT_TIME, event.getInitedTime()); - atsEntity.addOtherInfo(ATSConstants.NUM_TASKS, event.getNumTasks()); - atsEntity.addOtherInfo(ATSConstants.PROCESSOR_CLASS_NAME, event.getProcessorName()); - - return atsEntity; - } - - private static TimelineEntity convertVertexStartedEvent(VertexStartedEvent event) { - TimelineEntity atsEntity = new TimelineEntity(); - atsEntity.setEntityId(event.getVertexID().toString()); - atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name()); - - atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, - event.getVertexID().getDAGId().getApplicationId().toString()); - atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), - event.getVertexID().getDAGId().toString()); - - TimelineEvent startEvt = new TimelineEvent(); - startEvt.setEventType(HistoryEventType.VERTEX_STARTED.name()); - startEvt.setTimestamp(event.getStartTime()); - atsEntity.addEvent(startEvt); - - atsEntity.addOtherInfo(ATSConstants.START_REQUESTED_TIME, event.getStartRequestedTime()); - atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime()); - atsEntity.addOtherInfo(ATSConstants.STATUS, event.getVertexState().toString()); - - return atsEntity; - } - - private static TimelineEntity convertVertexParallelismUpdatedEvent( - VertexParallelismUpdatedEvent event) { - TimelineEntity atsEntity = new TimelineEntity(); - atsEntity.setEntityId(event.getVertexID().toString()); - atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name()); - - atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, - event.getVertexID().getDAGId().getApplicationId().toString()); - atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), - event.getVertexID().getDAGId().toString()); - - TimelineEvent updateEvt = new TimelineEvent(); - updateEvt.setEventType(HistoryEventType.VERTEX_PARALLELISM_UPDATED.name()); - updateEvt.setTimestamp(event.getUpdateTime()); - - Map eventInfo = new HashMap(); - if (event.getSourceEdgeManagers() != null && !event.getSourceEdgeManagers().isEmpty()) { - Map updatedEdgeManagers = new HashMap(); - for (Entry entry : - event.getSourceEdgeManagers().entrySet()) { - updatedEdgeManagers.put(entry.getKey(), - DAGUtils.convertEdgeManagerPluginDescriptor(entry.getValue())); - } - eventInfo.put(ATSConstants.UPDATED_EDGE_MANAGERS, updatedEdgeManagers); - } - eventInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks()); - eventInfo.put(ATSConstants.OLD_NUM_TASKS, event.getOldNumTasks()); - updateEvt.setEventInfo(eventInfo); - atsEntity.addEvent(updateEvt); - - atsEntity.addOtherInfo(ATSConstants.NUM_TASKS, event.getNumTasks()); - - return atsEntity; - } - -} http://git-wip-us.apache.org/repos/asf/tez/blob/6692c514/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java deleted file mode 100644 index 18ec43e..0000000 --- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.history.logging.ats; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; -import org.apache.hadoop.yarn.client.api.TimelineClient; -import org.apache.hadoop.yarn.util.SystemClock; -import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.app.AppContext; -import org.apache.tez.dag.history.DAGHistoryEvent; -import org.apache.tez.dag.history.events.DAGStartedEvent; -import org.apache.tez.dag.records.TezDAGID; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Matchers; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class TestATSHistoryLoggingService { - - private static final Log LOG = LogFactory.getLog(TestATSHistoryLoggingService.class); - - private ATSHistoryLoggingService atsHistoryLoggingService; - private AppContext appContext; - private Configuration conf; - private int atsInvokeCounter; - private int atsEntitiesCounter; - private SystemClock clock = new SystemClock(); - - @Before - public void setup() throws Exception { - appContext = mock(AppContext.class); - atsHistoryLoggingService = new ATSHistoryLoggingService(); - atsHistoryLoggingService.setAppContext(appContext); - conf = new Configuration(false); - conf.setLong(TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS, - 1000l); - conf.setInt(TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH, 2); - conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true); - atsInvokeCounter = 0; - atsEntitiesCounter = 0; - atsHistoryLoggingService.init(conf); - atsHistoryLoggingService.timelineClient = mock(TimelineClient.class); - atsHistoryLoggingService.start(); - when(appContext.getClock()).thenReturn(clock); - when(appContext.getCurrentDAGID()).thenReturn(null); - when(atsHistoryLoggingService.timelineClient.putEntities( - Matchers.anyVararg())).thenAnswer( - new Answer() { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - ++atsInvokeCounter; - atsEntitiesCounter += invocation.getArguments().length; - try { - Thread.sleep(500l); - } catch (InterruptedException e) { - // do nothing - } - return null; - } - } - ); - } - - @After - public void teardown() { - atsHistoryLoggingService.stop(); - atsHistoryLoggingService = null; - } - - @Test(timeout=20000) - public void testATSHistoryLoggingServiceShutdown() { - TezDAGID tezDAGID = TezDAGID.getInstance( - ApplicationId.newInstance(100l, 1), 1); - DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID, - new DAGStartedEvent(tezDAGID, 1001l, "user1", "dagName1")); - - for (int i = 0; i < 100; ++i) { - atsHistoryLoggingService.handle(historyEvent); - } - - try { - Thread.sleep(2500l); - } catch (InterruptedException e) { - // Do nothing - } - atsHistoryLoggingService.stop(); - - LOG.info("ATS entitiesSent=" + atsEntitiesCounter - + ", timelineInvocations=" + atsInvokeCounter); - - Assert.assertTrue(atsEntitiesCounter >= 4); - Assert.assertTrue(atsEntitiesCounter < 20); - - } - - @Test(timeout=20000) - public void testATSEventBatching() { - TezDAGID tezDAGID = TezDAGID.getInstance( - ApplicationId.newInstance(100l, 1), 1); - DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID, - new DAGStartedEvent(tezDAGID, 1001l, "user1", "dagName1")); - - for (int i = 0; i < 100; ++i) { - atsHistoryLoggingService.handle(historyEvent); - } - - try { - Thread.sleep(1000l); - } catch (InterruptedException e) { - // Do nothing - } - LOG.info("ATS entitiesSent=" + atsEntitiesCounter - + ", timelineInvocations=" + atsInvokeCounter); - - Assert.assertTrue(atsEntitiesCounter > atsInvokeCounter); - Assert.assertEquals(atsEntitiesCounter/2, atsInvokeCounter); - } - -} http://git-wip-us.apache.org/repos/asf/tez/blob/6692c514/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java deleted file mode 100644 index 9c4f721..0000000 --- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java +++ /dev/null @@ -1,242 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.history.logging.ats; - -import java.io.IOException; -import java.util.Random; - -import javax.ws.rs.core.MediaType; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.tez.client.TezClient; -import org.apache.tez.dag.api.DAG; -import org.apache.tez.dag.api.ProcessorDescriptor; -import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.Vertex; -import org.apache.tez.dag.api.client.DAGClient; -import org.apache.tez.dag.api.client.DAGStatus; -import org.apache.tez.dag.records.TezDAGID; -import org.apache.tez.runtime.library.processor.SleepProcessor; -import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig; -import org.apache.tez.tests.MiniTezClusterWithTimeline; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -import com.sun.jersey.api.client.Client; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.WebResource; - -public class TestATSHistoryWithMiniCluster { - - private static final Log LOG = LogFactory.getLog(TestATSHistoryWithMiniCluster.class); - - protected static MiniTezClusterWithTimeline mrrTezCluster = null; - protected static MiniDFSCluster dfsCluster = null; - private static String timelineAddress; - private Random random = new Random(); - - private static Configuration conf = new Configuration(); - private static FileSystem remoteFs; - - private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR - + TestATSHistoryWithMiniCluster.class.getName() + "-tmpDir"; - - @BeforeClass - public static void setup() throws IOException { - try { - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR); - dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null) - .build(); - remoteFs = dfsCluster.getFileSystem(); - } catch (IOException io) { - throw new RuntimeException("problem starting mini dfs cluster", io); - } - - if (mrrTezCluster == null) { - try { - mrrTezCluster = new MiniTezClusterWithTimeline(TestATSHistoryWithMiniCluster.class.getName(), - 1, 1, 1, true); - Configuration conf = new Configuration(); - conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); - conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS - conf.setInt("yarn.nodemanager.delete.debug-delay-sec", 20000); - mrrTezCluster.init(conf); - mrrTezCluster.start(); - } catch (Throwable e) { - LOG.info("Failed to start Mini Tez Cluster", e); - } - } - timelineAddress = mrrTezCluster.getConfig().get( - YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS); - if (timelineAddress != null) { - // Hack to handle bug in MiniYARNCluster handling of webapp address - timelineAddress = timelineAddress.replace("0.0.0.0", "localhost"); - } - } - - @AfterClass - public static void tearDown() throws InterruptedException { - LOG.info("Shutdown invoked"); - Thread.sleep(10000); - if (mrrTezCluster != null) { - mrrTezCluster.stop(); - mrrTezCluster = null; - } - if (dfsCluster != null) { - dfsCluster.shutdown(); - dfsCluster = null; - } - } - - // To be replaced after Timeline has java APIs for domains - private K getTimelineData(String url, Class clazz) { - Client client = new Client(); - WebResource resource = client.resource(url); - - ClientResponse response = resource.accept(MediaType.APPLICATION_JSON) - .get(ClientResponse.class); - Assert.assertEquals(200, response.getStatus()); - Assert.assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); - - K entity = response.getEntity(clazz); - Assert.assertNotNull(entity); - return entity; - } - - @Test (timeout=50000) - public void testSimpleAMACls() throws Exception { - TezClient tezSession = null; - ApplicationId applicationId; - try { - SleepProcessorConfig spConf = new SleepProcessorConfig(1); - - DAG dag = DAG.create("TezSleepProcessor"); - Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create( - SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1, - Resource.newInstance(256, 1)); - dag.addVertex(vertex); - - TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); - tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true); - tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, - ATSHistoryLoggingService.class.getName()); - Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random - .nextInt(100000)))); - remoteFs.mkdirs(remoteStagingDir); - tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString()); - - tezSession = TezClient.create("TezSleepProcessor", tezConf, true); - tezSession.start(); - - applicationId = tezSession.getAppMasterApplicationId(); - - DAGClient dagClient = tezSession.submitDAG(dag); - - DAGStatus dagStatus = dagClient.getDAGStatus(null); - while (!dagStatus.isCompleted()) { - LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: " - + dagStatus.getState()); - Thread.sleep(500l); - dagStatus = dagClient.getDAGStatus(null); - } - Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState()); - } finally { - if (tezSession != null) { - tezSession.stop(); - } - } - -// verifyEntityExistence(applicationId); - } - - @Test (timeout=50000) - public void testDAGACls() throws Exception { - TezClient tezSession = null; - ApplicationId applicationId; - try { - SleepProcessorConfig spConf = new SleepProcessorConfig(1); - - DAG dag = DAG.create("TezSleepProcessor"); - Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create( - SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1, - Resource.newInstance(256, 1)); - dag.addVertex(vertex); - - TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); - tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true); - tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, - ATSHistoryLoggingService.class.getName()); - Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random - .nextInt(100000)))); - remoteFs.mkdirs(remoteStagingDir); - tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString()); - - tezSession = TezClient.create("TezSleepProcessor", tezConf, true); - tezSession.start(); - - applicationId = tezSession.getAppMasterApplicationId(); - - DAGClient dagClient = tezSession.submitDAG(dag); - - DAGStatus dagStatus = dagClient.getDAGStatus(null); - while (!dagStatus.isCompleted()) { - LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: " - + dagStatus.getState()); - Thread.sleep(500l); - dagStatus = dagClient.getDAGStatus(null); - } - Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState()); - } finally { - if (tezSession != null) { - tezSession.stop(); - } - } -// verifyEntityExistence(applicationId); - } - - private void verifyEntityExistence(ApplicationId applicationId) { - Assert.assertNotNull(timelineAddress); - - String appUrl = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_APPLICATION/" - + "tez_" + applicationId.toString() + "?fields=otherinfo"; - LOG.info("Getting timeline entity for tez application: " + appUrl); - TimelineEntity appEntity = getTimelineData(appUrl, TimelineEntity.class); - Assert.assertNotNull(appEntity); - - TezDAGID tezDAGID = TezDAGID.getInstance(applicationId, 1); - String dagUrl = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_DAG_ID/" - + tezDAGID.toString() + "?fields=otherinfo"; - LOG.info("Getting timeline entity for tez dag: " + dagUrl); - TimelineEntity dagEntity = getTimelineData(dagUrl, TimelineEntity.class); - Assert.assertNotNull(dagEntity); - } - - -}