Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AD5BAC1E5 for ; Fri, 19 Dec 2014 01:27:45 +0000 (UTC) Received: (qmail 55500 invoked by uid 500); 19 Dec 2014 01:27:45 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 55399 invoked by uid 500); 19 Dec 2014 01:27:45 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 55322 invoked by uid 99); 19 Dec 2014 01:27:45 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Dec 2014 01:27:45 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 3470A9CB49A; Fri, 19 Dec 2014 01:27:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hitesh@apache.org To: commits@tez.apache.org Date: Fri, 19 Dec 2014 01:27:47 -0000 Message-Id: <3f87a6a704f74c709120caafedeb63d6@git.apache.org> In-Reply-To: <554fb219698e48bda0d3aa673787f018@git.apache.org> References: <554fb219698e48bda0d3aa673787f018@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/4] tez git commit: TEZ-1868. Document how to do Windows builds due to with ACL symlink build changes. (hitesh) TEZ-1868. Document how to do Windows builds due to with ACL symlink build changes. (hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/20441ab0 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/20441ab0 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/20441ab0 Branch: refs/heads/master Commit: 20441ab09c93dce03220f64cc6c51c8ec6359f7c Parents: 059f7bc Author: Hitesh Shah Authored: Thu Dec 18 17:20:53 2014 -0800 Committer: Hitesh Shah Committed: Thu Dec 18 17:27:38 2014 -0800 ---------------------------------------------------------------------- .../logging/ats/ATSHistoryLoggingService.java | 325 ++++++++++ .../ats/HistoryEventTimelineConversion.java | 601 +++++++++++++++++++ .../tez/tests/MiniTezClusterWithTimeline.java | 253 ++++++++ .../logging/ats/ATSHistoryLoggingService.java | 325 ---------- .../ats/HistoryEventTimelineConversion.java | 601 ------------------- .../tez/tests/MiniTezClusterWithTimeline.java | 253 -------- 6 files changed, 1179 insertions(+), 1179 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/20441ab0/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java new file mode 100644 index 0000000..c68d395 --- /dev/null +++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java @@ -0,0 +1,325 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history.logging.ats; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.common.security.HistoryACLPolicyManager; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.history.HistoryEventType; +import org.apache.tez.dag.history.events.DAGSubmittedEvent; +import org.apache.tez.dag.history.logging.HistoryLoggingService; +import org.apache.tez.dag.records.TezDAGID; + +import com.google.common.annotations.VisibleForTesting; + +public class ATSHistoryLoggingService extends HistoryLoggingService { + + private static final Log LOG = LogFactory.getLog(ATSHistoryLoggingService.class); + + private LinkedBlockingQueue eventQueue = + new LinkedBlockingQueue(); + + private Thread eventHandlingThread; + private AtomicBoolean stopped = new AtomicBoolean(false); + private int eventCounter = 0; + private int eventsProcessed = 0; + private final Object lock = new Object(); + + @VisibleForTesting + TimelineClient timelineClient; + + private HashSet skippedDAGs = new HashSet(); + private Map dagDomainIdMap = new HashMap(); + private long maxTimeToWaitOnShutdown; + private boolean waitForeverOnShutdown = false; + + private int maxEventsPerBatch; + private long maxPollingTimeMillis; + + private String sessionDomainId; + private static final String atsHistoryACLManagerClassName = + "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager"; + private HistoryACLPolicyManager historyACLPolicyManager; + + public ATSHistoryLoggingService() { + super(ATSHistoryLoggingService.class.getName()); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + LOG.info("Initializing ATSService"); + timelineClient = TimelineClient.createTimelineClient(); + timelineClient.init(conf); + maxTimeToWaitOnShutdown = conf.getLong( + TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS, + TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS_DEFAULT); + maxEventsPerBatch = conf.getInt( + TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH, + TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH_DEFAULT); + maxPollingTimeMillis = conf.getInt( + TezConfiguration.YARN_ATS_MAX_POLLING_TIME_PER_EVENT, + TezConfiguration.YARN_ATS_MAX_POLLING_TIME_PER_EVENT_DEFAULT); + if (maxTimeToWaitOnShutdown < 0) { + waitForeverOnShutdown = true; + } + sessionDomainId = conf.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID); + + LOG.info("Using " + atsHistoryACLManagerClassName + " to manage Timeline ACLs"); + try { + historyACLPolicyManager = ReflectionUtils.createClazzInstance( + atsHistoryACLManagerClassName); + historyACLPolicyManager.setConf(conf); + } catch (TezUncheckedException e) { + LOG.warn("Could not instantiate object for " + atsHistoryACLManagerClassName + + ". ACLs cannot be enforced correctly for history data in Timeline", e); + if (!conf.getBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, + TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS_DEFAULT)) { + throw e; + } + historyACLPolicyManager = null; + } + + } + + @Override + public void serviceStart() { + LOG.info("Starting ATSService"); + timelineClient.start(); + + eventHandlingThread = new Thread(new Runnable() { + @Override + public void run() { + List events = new LinkedList(); + boolean interrupted = false; + while (!stopped.get() && !Thread.currentThread().isInterrupted() + && !interrupted) { + + // Log the size of the event-queue every so often. + if (eventCounter != 0 && eventCounter % 1000 == 0) { + if (eventsProcessed != 0 && !events.isEmpty()) { + LOG.info("Event queue stats" + + ", eventsProcessedSinceLastUpdate=" + eventsProcessed + + ", eventQueueSize=" + eventQueue.size()); + } + eventCounter = 0; + eventsProcessed = 0; + } else { + ++eventCounter; + } + + synchronized (lock) { + try { + getEventBatch(events); + } catch (InterruptedException e) { + // Finish processing events and then return + interrupted = true; + } + + if (events.isEmpty()) { + continue; + } + + eventsProcessed += events.size(); + try { + handleEvents(events); + } catch (Exception e) { + LOG.warn("Error handling events", e); + } + } + } + } + }, "HistoryEventHandlingThread"); + eventHandlingThread.start(); + } + + @Override + public void serviceStop() { + LOG.info("Stopping ATSService" + + ", eventQueueBacklog=" + eventQueue.size()); + stopped.set(true); + if (eventHandlingThread != null) { + eventHandlingThread.interrupt(); + } + synchronized (lock) { + if (!eventQueue.isEmpty()) { + LOG.warn("ATSService being stopped" + + ", eventQueueBacklog=" + eventQueue.size() + + ", maxTimeLeftToFlush=" + maxTimeToWaitOnShutdown + + ", waitForever=" + waitForeverOnShutdown); + long startTime = appContext.getClock().getTime(); + long endTime = startTime + maxTimeToWaitOnShutdown; + List events = new LinkedList(); + while (waitForeverOnShutdown || (endTime >= appContext.getClock().getTime())) { + try { + getEventBatch(events); + } catch (InterruptedException e) { + LOG.info("ATSService interrupted while shutting down. Exiting." + + " EventQueueBacklog=" + eventQueue.size()); + } + if (events.isEmpty()) { + LOG.info("Event queue empty, stopping ATS Service"); + break; + } + try { + handleEvents(events); + } catch (Exception e) { + LOG.warn("Error handling event", e); + break; + } + } + } + } + if (!eventQueue.isEmpty()) { + LOG.warn("Did not finish flushing eventQueue before stopping ATSService" + + ", eventQueueBacklog=" + eventQueue.size()); + } + timelineClient.stop(); + } + + private void getEventBatch(List events) throws InterruptedException { + events.clear(); + int counter = 0; + while (counter < maxEventsPerBatch) { + DAGHistoryEvent event = eventQueue.poll(maxPollingTimeMillis, TimeUnit.MILLISECONDS); + if (event == null) { + break; + } + if (!isValidEvent(event)) { + continue; + } + ++counter; + events.add(event); + if (event.getHistoryEvent().getEventType().equals(HistoryEventType.DAG_SUBMITTED)) { + // Special case this as it might be a large payload + break; + } + } + } + + + public void handle(DAGHistoryEvent event) { + eventQueue.add(event); + } + + private boolean isValidEvent(DAGHistoryEvent event) { + HistoryEventType eventType = event.getHistoryEvent().getEventType(); + TezDAGID dagId = event.getDagID(); + + if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) { + DAGSubmittedEvent dagSubmittedEvent = + (DAGSubmittedEvent) event.getHistoryEvent(); + String dagName = dagSubmittedEvent.getDAGName(); + if (dagName != null + && dagName.startsWith( + TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) { + // Skip recording pre-warm DAG events + skippedDAGs.add(dagId); + return false; + } + if (historyACLPolicyManager != null) { + String dagDomainId = dagSubmittedEvent.getConf().get( + TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID); + if (dagDomainId != null) { + dagDomainIdMap.put(dagId, dagDomainId); + } + } + } + if (eventType.equals(HistoryEventType.DAG_FINISHED)) { + // Remove from set to keep size small + // No more events should be seen after this point. + if (skippedDAGs.remove(dagId)) { + return false; + } + } + + if (dagId != null && skippedDAGs.contains(dagId)) { + // Skip pre-warm DAGs + return false; + } + + return true; + } + + private void handleEvents(List events) { + TimelineEntity[] entities = new TimelineEntity[events.size()]; + for (int i = 0; i < events.size(); ++i) { + DAGHistoryEvent event = events.get(i); + String domainId = sessionDomainId; + TezDAGID dagId = event.getDagID(); + + if (historyACLPolicyManager != null && dagId != null) { + if (dagDomainIdMap.containsKey(dagId)) { + domainId = dagDomainIdMap.get(dagId); + } + } + + entities[i] = HistoryEventTimelineConversion.convertToTimelineEntity(event.getHistoryEvent()); + if (historyACLPolicyManager != null) { + if (domainId != null && !domainId.isEmpty()) { + historyACLPolicyManager.updateTimelineEntityDomain(entities[i], domainId); + } + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Sending event batch to Timeline, batchSize=" + events.size()); + } + try { + TimelinePutResponse response = + timelineClient.putEntities(entities); + if (response != null + && !response.getErrors().isEmpty()) { + int count = response.getErrors().size(); + for (int i = 0; i < count; ++i) { + TimelinePutError err = response.getErrors().get(i); + if (err.getErrorCode() != 0) { + LOG.warn("Could not post history event to ATS" + + ", atsPutError=" + err.getErrorCode() + + ", entityId=" + entities[i].getEntityId() + + ", eventType=" + events.get(i).getHistoryEvent().getEventType()); + } + } + } + // Do nothing additional, ATS client library should handle throttling + // or auto-disable as needed + } catch (Exception e) { + LOG.warn("Could not handle history events", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/20441ab0/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java new file mode 100644 index 0000000..91346ae --- /dev/null +++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java @@ -0,0 +1,601 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history.logging.ats; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.TreeMap; + +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.tez.common.ATSConstants; +import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.history.HistoryEvent; +import org.apache.tez.dag.history.HistoryEventType; +import org.apache.tez.dag.history.events.AMLaunchedEvent; +import org.apache.tez.dag.history.events.AMStartedEvent; +import org.apache.tez.dag.history.events.AppLaunchedEvent; +import org.apache.tez.dag.history.events.ContainerLaunchedEvent; +import org.apache.tez.dag.history.events.ContainerStoppedEvent; +import org.apache.tez.dag.history.events.DAGFinishedEvent; +import org.apache.tez.dag.history.events.DAGInitializedEvent; +import org.apache.tez.dag.history.events.DAGStartedEvent; +import org.apache.tez.dag.history.events.DAGSubmittedEvent; +import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; +import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; +import org.apache.tez.dag.history.events.TaskFinishedEvent; +import org.apache.tez.dag.history.events.TaskStartedEvent; +import org.apache.tez.dag.history.events.VertexFinishedEvent; +import org.apache.tez.dag.history.events.VertexInitializedEvent; +import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent; +import org.apache.tez.dag.history.events.VertexStartedEvent; +import org.apache.tez.dag.history.logging.EntityTypes; +import org.apache.tez.dag.history.utils.DAGUtils; +import org.apache.tez.dag.records.TezVertexID; + +public class HistoryEventTimelineConversion { + + public static TimelineEntity convertToTimelineEntity(HistoryEvent historyEvent) { + if (!historyEvent.isHistoryEvent()) { + throw new UnsupportedOperationException("Invalid Event, does not support history" + + ", eventType=" + historyEvent.getEventType()); + } + TimelineEntity timelineEntity = null; + switch (historyEvent.getEventType()) { + case APP_LAUNCHED: + timelineEntity = convertAppLaunchedEvent((AppLaunchedEvent) historyEvent); + break; + case AM_LAUNCHED: + timelineEntity = convertAMLaunchedEvent((AMLaunchedEvent) historyEvent); + break; + case AM_STARTED: + timelineEntity = convertAMStartedEvent((AMStartedEvent) historyEvent); + break; + case CONTAINER_LAUNCHED: + timelineEntity = convertContainerLaunchedEvent((ContainerLaunchedEvent) historyEvent); + break; + case CONTAINER_STOPPED: + timelineEntity = convertContainerStoppedEvent((ContainerStoppedEvent) historyEvent); + break; + case DAG_SUBMITTED: + timelineEntity = convertDAGSubmittedEvent((DAGSubmittedEvent) historyEvent); + break; + case DAG_INITIALIZED: + timelineEntity = convertDAGInitializedEvent((DAGInitializedEvent) historyEvent); + break; + case DAG_STARTED: + timelineEntity = convertDAGStartedEvent((DAGStartedEvent) historyEvent); + break; + case DAG_FINISHED: + timelineEntity = convertDAGFinishedEvent((DAGFinishedEvent) historyEvent); + break; + case VERTEX_INITIALIZED: + timelineEntity = convertVertexInitializedEvent((VertexInitializedEvent) historyEvent); + break; + case VERTEX_STARTED: + timelineEntity = convertVertexStartedEvent((VertexStartedEvent) historyEvent); + break; + case VERTEX_FINISHED: + timelineEntity = convertVertexFinishedEvent((VertexFinishedEvent) historyEvent); + break; + case TASK_STARTED: + timelineEntity = convertTaskStartedEvent((TaskStartedEvent) historyEvent); + break; + case TASK_FINISHED: + timelineEntity = convertTaskFinishedEvent((TaskFinishedEvent) historyEvent); + break; + case TASK_ATTEMPT_STARTED: + timelineEntity = convertTaskAttemptStartedEvent((TaskAttemptStartedEvent) historyEvent); + break; + case TASK_ATTEMPT_FINISHED: + timelineEntity = convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent); + break; + case VERTEX_PARALLELISM_UPDATED: + timelineEntity = convertVertexParallelismUpdatedEvent( + (VertexParallelismUpdatedEvent) historyEvent); + break; + case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED: + case VERTEX_COMMIT_STARTED: + case VERTEX_GROUP_COMMIT_STARTED: + case VERTEX_GROUP_COMMIT_FINISHED: + case DAG_COMMIT_STARTED: + throw new UnsupportedOperationException("Invalid Event, does not support history" + + ", eventType=" + historyEvent.getEventType()); + default: + throw new UnsupportedOperationException("Unhandled Event" + + ", eventType=" + historyEvent.getEventType()); + } + return timelineEntity; + } + + private static TimelineEntity convertAppLaunchedEvent(AppLaunchedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId("tez_" + + event.getApplicationId().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_APPLICATION.name()); + + atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ID, + event.getApplicationId().toString()); + atsEntity.addRelatedEntity(ATSConstants.USER, event.getUser()); + + atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); + + atsEntity.addOtherInfo(ATSConstants.CONFIG, + DAGUtils.convertConfigurationToATSMap(event.getConf())); + + atsEntity.setStartTime(event.getLaunchTime()); + + return atsEntity; + } + + private static TimelineEntity convertAMLaunchedEvent(AMLaunchedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId("tez_" + + event.getApplicationAttemptId().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_APPLICATION_ATTEMPT.name()); + + atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ID, + event.getApplicationAttemptId().getApplicationId().toString()); + atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ATTEMPT_ID, + event.getApplicationAttemptId().toString()); + atsEntity.addRelatedEntity(ATSConstants.USER, event.getUser()); + + atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); + + atsEntity.setStartTime(event.getLaunchTime()); + + TimelineEvent launchEvt = new TimelineEvent(); + launchEvt.setEventType(HistoryEventType.AM_LAUNCHED.name()); + launchEvt.setTimestamp(event.getLaunchTime()); + atsEntity.addEvent(launchEvt); + + atsEntity.addOtherInfo(ATSConstants.APP_SUBMIT_TIME, event.getAppSubmitTime()); + + return atsEntity; + } + + private static TimelineEntity convertAMStartedEvent(AMStartedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId("tez_" + + event.getApplicationAttemptId().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_APPLICATION_ATTEMPT.name()); + + atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getApplicationAttemptId().getApplicationId().toString()); + + TimelineEvent startEvt = new TimelineEvent(); + startEvt.setEventType(HistoryEventType.AM_STARTED.name()); + startEvt.setTimestamp(event.getStartTime()); + atsEntity.addEvent(startEvt); + + return atsEntity; + } + + private static TimelineEntity convertContainerLaunchedEvent(ContainerLaunchedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId("tez_" + + event.getContainerId().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_CONTAINER_ID.name()); + + atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), + "tez_" + event.getApplicationAttemptId().toString()); + atsEntity.addRelatedEntity(ATSConstants.CONTAINER_ID, + event.getContainerId().toString()); + + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getApplicationAttemptId().getApplicationId().toString()); + + atsEntity.setStartTime(event.getLaunchTime()); + + TimelineEvent launchEvt = new TimelineEvent(); + launchEvt.setEventType(HistoryEventType.CONTAINER_LAUNCHED.name()); + launchEvt.setTimestamp(event.getLaunchTime()); + atsEntity.addEvent(launchEvt); + + return atsEntity; + } + + private static TimelineEntity convertContainerStoppedEvent(ContainerStoppedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId("tez_" + + event.getContainerId().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_CONTAINER_ID.name()); + + // In case, a container is stopped in a different attempt + atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), + "tez_" + event.getApplicationAttemptId().toString()); + + TimelineEvent stoppedEvt = new TimelineEvent(); + stoppedEvt.setEventType(HistoryEventType.CONTAINER_STOPPED.name()); + stoppedEvt.setTimestamp(event.getStoppedTime()); + atsEntity.addEvent(stoppedEvt); + + atsEntity.addPrimaryFilter(ATSConstants.EXIT_STATUS, event.getExitStatus()); + + atsEntity.addOtherInfo(ATSConstants.EXIT_STATUS, event.getExitStatus()); + atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getStoppedTime()); + + return atsEntity; + } + + private static TimelineEntity convertDAGFinishedEvent(DAGFinishedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getDagID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name()); + + TimelineEvent finishEvt = new TimelineEvent(); + finishEvt.setEventType(HistoryEventType.DAG_FINISHED.name()); + finishEvt.setTimestamp(event.getFinishTime()); + atsEntity.addEvent(finishEvt); + + atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); + atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName()); + atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name()); + + atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime()); + atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime()); + atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); + atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name()); + atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); + atsEntity.addOtherInfo(ATSConstants.COUNTERS, + DAGUtils.convertCountersToATSMap(event.getTezCounters())); + + final Map dagTaskStats = event.getDagTaskStats(); + if (dagTaskStats != null) { + for(Entry entry : dagTaskStats.entrySet()) { + atsEntity.addOtherInfo(entry.getKey(), entry.getValue().intValue()); + } + } + + return atsEntity; + } + + private static TimelineEntity convertDAGInitializedEvent(DAGInitializedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getDagID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name()); + + TimelineEvent initEvt = new TimelineEvent(); + initEvt.setEventType(HistoryEventType.DAG_INITIALIZED.name()); + initEvt.setTimestamp(event.getInitTime()); + atsEntity.addEvent(initEvt); + + atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); + atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName()); + + atsEntity.addOtherInfo(ATSConstants.INIT_TIME, event.getInitTime()); + + if (event.getVertexNameIDMap() != null) { + Map nameIdStrMap = new TreeMap(); + for (Entry entry : event.getVertexNameIDMap().entrySet()) { + nameIdStrMap.put(entry.getKey(), entry.getValue().toString()); + } + atsEntity.addOtherInfo(ATSConstants.VERTEX_NAME_ID_MAPPING, nameIdStrMap); + } + + return atsEntity; + } + + private static TimelineEntity convertDAGStartedEvent(DAGStartedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getDagID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name()); + + TimelineEvent startEvt = new TimelineEvent(); + startEvt.setEventType(HistoryEventType.DAG_STARTED.name()); + startEvt.setTimestamp(event.getStartTime()); + atsEntity.addEvent(startEvt); + + atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); + atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName()); + + atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime()); + atsEntity.addOtherInfo(ATSConstants.STATUS, event.getDagState().toString()); + + return atsEntity; + } + + private static TimelineEntity convertDAGSubmittedEvent(DAGSubmittedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getDagID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name()); + + atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION.name(), + "tez_" + event.getApplicationAttemptId().getApplicationId().toString()); + atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), + "tez_" + event.getApplicationAttemptId().toString()); + atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ID, + event.getApplicationAttemptId().getApplicationId().toString()); + atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ATTEMPT_ID, + event.getApplicationAttemptId().toString()); + atsEntity.addRelatedEntity(ATSConstants.USER, event.getUser()); + + TimelineEvent submitEvt = new TimelineEvent(); + submitEvt.setEventType(HistoryEventType.DAG_SUBMITTED.name()); + submitEvt.setTimestamp(event.getSubmitTime()); + atsEntity.addEvent(submitEvt); + + atsEntity.setStartTime(event.getSubmitTime()); + + atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser()); + atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDAGName()); + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getDagID().getApplicationId().toString()); + + try { + atsEntity.addOtherInfo(ATSConstants.DAG_PLAN, + DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan())); + } catch (IOException e) { + throw new TezUncheckedException(e); + } + atsEntity.addOtherInfo(ATSConstants.APPLICATION_ID, + event.getApplicationAttemptId().getApplicationId().toString()); + + return atsEntity; + } + + private static TimelineEntity convertTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getTaskAttemptID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_TASK_ATTEMPT_ID.name()); + + atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), + event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(), + event.getTaskAttemptID().getTaskID().getVertexID().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_TASK_ID.name(), + event.getTaskAttemptID().getTaskID().toString()); + + TimelineEvent finishEvt = new TimelineEvent(); + finishEvt.setEventType(HistoryEventType.TASK_ATTEMPT_FINISHED.name()); + finishEvt.setTimestamp(event.getFinishTime()); + atsEntity.addEvent(finishEvt); + + atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name()); + + atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime()); + atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); + atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name()); + if (event.getTaskAttemptError() != null) { + atsEntity.addOtherInfo(ATSConstants.TASK_ATTEMPT_ERROR_ENUM, event.getTaskAttemptError().name()); + } + atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); + atsEntity.addOtherInfo(ATSConstants.COUNTERS, + DAGUtils.convertCountersToATSMap(event.getCounters())); + + return atsEntity; + } + + private static TimelineEntity convertTaskAttemptStartedEvent(TaskAttemptStartedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getTaskAttemptID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_TASK_ATTEMPT_ID.name()); + + atsEntity.setStartTime(event.getStartTime()); + + atsEntity.addRelatedEntity(ATSConstants.NODE_ID, event.getNodeId().toString()); + atsEntity.addRelatedEntity(ATSConstants.CONTAINER_ID, event.getContainerId().toString()); + atsEntity.addRelatedEntity(EntityTypes.TEZ_TASK_ID.name(), + event.getTaskAttemptID().getTaskID().toString()); + + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getApplicationId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), + event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(), + event.getTaskAttemptID().getTaskID().getVertexID().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_TASK_ID.name(), + event.getTaskAttemptID().getTaskID().toString()); + + TimelineEvent startEvt = new TimelineEvent(); + startEvt.setEventType(HistoryEventType.TASK_ATTEMPT_STARTED.name()); + startEvt.setTimestamp(event.getStartTime()); + atsEntity.addEvent(startEvt); + + atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime()); + atsEntity.addOtherInfo(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl()); + atsEntity.addOtherInfo(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl()); + atsEntity.addOtherInfo(ATSConstants.NODE_ID, event.getNodeId().toString()); + atsEntity.addOtherInfo(ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress()); + atsEntity.addOtherInfo(ATSConstants.CONTAINER_ID, event.getContainerId().toString()); + + return atsEntity; + } + + private static TimelineEntity convertTaskFinishedEvent(TaskFinishedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getTaskID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_TASK_ID.name()); + + atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), + event.getTaskID().getVertexID().getDAGId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(), + event.getTaskID().getVertexID().toString()); + + TimelineEvent finishEvt = new TimelineEvent(); + finishEvt.setEventType(HistoryEventType.TASK_FINISHED.name()); + finishEvt.setTimestamp(event.getFinishTime()); + atsEntity.addEvent(finishEvt); + + atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name()); + + atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime()); + atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); + atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name()); + if (event.getSuccessfulAttemptID() != null) { + atsEntity.addOtherInfo(ATSConstants.SUCCESSFUL_ATTEMPT_ID, + event.getSuccessfulAttemptID().toString()); + } + + atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); + atsEntity.addOtherInfo(ATSConstants.COUNTERS, + DAGUtils.convertCountersToATSMap(event.getTezCounters())); + + return atsEntity; + } + + private static TimelineEntity convertTaskStartedEvent(TaskStartedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getTaskID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_TASK_ID.name()); + + atsEntity.addRelatedEntity(EntityTypes.TEZ_VERTEX_ID.name(), + event.getTaskID().getVertexID().toString()); + + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getTaskID().getVertexID().getDAGId().getApplicationId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), + event.getTaskID().getVertexID().getDAGId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(), + event.getTaskID().getVertexID().toString()); + + TimelineEvent startEvt = new TimelineEvent(); + startEvt.setEventType(HistoryEventType.TASK_STARTED.name()); + startEvt.setTimestamp(event.getStartTime()); + atsEntity.addEvent(startEvt); + + atsEntity.setStartTime(event.getStartTime()); + + atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime()); + atsEntity.addOtherInfo(ATSConstants.SCHEDULED_TIME, event.getScheduledTime()); + + return atsEntity; + } + + private static TimelineEntity convertVertexFinishedEvent(VertexFinishedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getVertexID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name()); + + atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), + event.getVertexID().getDAGId().toString()); + + TimelineEvent finishEvt = new TimelineEvent(); + finishEvt.setEventType(HistoryEventType.VERTEX_FINISHED.name()); + finishEvt.setTimestamp(event.getFinishTime()); + atsEntity.addEvent(finishEvt); + + atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name()); + + atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime()); + atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); + atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name()); + + atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); + atsEntity.addOtherInfo(ATSConstants.COUNTERS, + DAGUtils.convertCountersToATSMap(event.getTezCounters())); + atsEntity.addOtherInfo(ATSConstants.STATS, + DAGUtils.convertVertexStatsToATSMap(event.getVertexStats())); + + final Map vertexTaskStats = event.getVertexTaskStats(); + if (vertexTaskStats != null) { + for(Entry entry : vertexTaskStats.entrySet()) { + atsEntity.addOtherInfo(entry.getKey(), entry.getValue().intValue()); + } + } + + return atsEntity; + } + + private static TimelineEntity convertVertexInitializedEvent(VertexInitializedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getVertexID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name()); + + atsEntity.addRelatedEntity(EntityTypes.TEZ_DAG_ID.name(), + event.getVertexID().getDAGId().toString()); + + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getVertexID().getDAGId().getApplicationId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), + event.getVertexID().getDAGId().toString()); + + TimelineEvent initEvt = new TimelineEvent(); + initEvt.setEventType(HistoryEventType.VERTEX_INITIALIZED.name()); + initEvt.setTimestamp(event.getInitedTime()); + atsEntity.addEvent(initEvt); + + atsEntity.setStartTime(event.getInitedTime()); + + atsEntity.addOtherInfo(ATSConstants.VERTEX_NAME, event.getVertexName()); + atsEntity.addOtherInfo(ATSConstants.INIT_REQUESTED_TIME, event.getInitRequestedTime()); + atsEntity.addOtherInfo(ATSConstants.INIT_TIME, event.getInitedTime()); + atsEntity.addOtherInfo(ATSConstants.NUM_TASKS, event.getNumTasks()); + atsEntity.addOtherInfo(ATSConstants.PROCESSOR_CLASS_NAME, event.getProcessorName()); + + return atsEntity; + } + + private static TimelineEntity convertVertexStartedEvent(VertexStartedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getVertexID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name()); + + atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, + event.getVertexID().getDAGId().getApplicationId().toString()); + atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(), + event.getVertexID().getDAGId().toString()); + + TimelineEvent startEvt = new TimelineEvent(); + startEvt.setEventType(HistoryEventType.VERTEX_STARTED.name()); + startEvt.setTimestamp(event.getStartTime()); + atsEntity.addEvent(startEvt); + + atsEntity.addOtherInfo(ATSConstants.START_REQUESTED_TIME, event.getStartRequestedTime()); + atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime()); + atsEntity.addOtherInfo(ATSConstants.STATUS, event.getVertexState().toString()); + + return atsEntity; + } + + private static TimelineEntity convertVertexParallelismUpdatedEvent( + VertexParallelismUpdatedEvent event) { + TimelineEntity atsEntity = new TimelineEntity(); + atsEntity.setEntityId(event.getVertexID().toString()); + atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name()); + + TimelineEvent updateEvt = new TimelineEvent(); + updateEvt.setEventType(HistoryEventType.VERTEX_PARALLELISM_UPDATED.name()); + updateEvt.setTimestamp(event.getUpdateTime()); + + Map eventInfo = new HashMap(); + if (event.getSourceEdgeManagers() != null && !event.getSourceEdgeManagers().isEmpty()) { + Map updatedEdgeManagers = new HashMap(); + for (Entry entry : + event.getSourceEdgeManagers().entrySet()) { + updatedEdgeManagers.put(entry.getKey(), + DAGUtils.convertEdgeManagerPluginDescriptor(entry.getValue())); + } + eventInfo.put(ATSConstants.UPDATED_EDGE_MANAGERS, updatedEdgeManagers); + } + eventInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks()); + eventInfo.put(ATSConstants.OLD_NUM_TASKS, event.getOldNumTasks()); + updateEvt.setEventInfo(eventInfo); + atsEntity.addEvent(updateEvt); + + atsEntity.addOtherInfo(ATSConstants.NUM_TASKS, event.getNumTasks()); + + return atsEntity; + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/20441ab0/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java new file mode 100644 index 0000000..d48948b --- /dev/null +++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java @@ -0,0 +1,253 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.tests; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.EnumSet; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.mapred.ShuffleHandler; +import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.util.JarFinder; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.app.DAGAppMaster; +import org.apache.tez.mapreduce.hadoop.MRConfig; +import org.apache.tez.mapreduce.hadoop.MRJobConfig; + +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; + +/** + * Configures and starts the Tez-specific components in the YARN cluster. + * + * When using this mini cluster, the user is expected to + */ +public class MiniTezClusterWithTimeline extends MiniYARNCluster { + + public static final String APPJAR = JarFinder.getJar(DAGAppMaster.class); + + private static final Log LOG = LogFactory.getLog(MiniTezClusterWithTimeline.class); + + private static final String YARN_CLUSTER_CONFIG = "yarn-site.xml"; + + private Path confFilePath; + + public MiniTezClusterWithTimeline(String testName) { + this(testName, 1); + } + + public MiniTezClusterWithTimeline(String testName, int noOfNMs) { + super(testName, noOfNMs, 4, 4); + } + + public MiniTezClusterWithTimeline(String testName, int noOfNMs, + int numLocalDirs, int numLogDirs) { + super(testName, noOfNMs, numLocalDirs, numLogDirs); + } + + public MiniTezClusterWithTimeline(String testName, int noOfNMs, + int numLocalDirs, int numLogDirs, boolean enableAHS) { + super(testName, 1, noOfNMs, numLocalDirs, numLogDirs, enableAHS); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_TEZ_FRAMEWORK_NAME); + // Use libs from cluster since no build is available + conf.setBoolean(TezConfiguration.TEZ_USE_CLUSTER_HADOOP_LIBS, true); + // blacklisting disabled to prevent scheduling issues + conf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false); + if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) { + conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(), + "apps_staging_dir" + Path.SEPARATOR).getAbsolutePath()); + } + + if (conf.get(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC) == null) { + // nothing defined. set quick delete value + conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l); + } + + File appJarLocalFile = new File(MiniTezClusterWithTimeline.APPJAR); + + if (!appJarLocalFile.exists()) { + String message = "TezAppJar " + MiniTezClusterWithTimeline.APPJAR + + " not found. Exiting."; + LOG.info(message); + throw new TezUncheckedException(message); + } else { + LOG.info("Using Tez AppJar: " + appJarLocalFile.getAbsolutePath()); + } + + FileSystem fs = FileSystem.get(conf); + Path testRootDir = fs.makeQualified(new Path("target", getName() + "-tmpDir")); + Path appRemoteJar = new Path(testRootDir, "TezAppJar.jar"); + // Copy AppJar and make it public. + Path appMasterJar = new Path(MiniTezClusterWithTimeline.APPJAR); + fs.copyFromLocalFile(appMasterJar, appRemoteJar); + fs.setPermission(appRemoteJar, new FsPermission("777")); + + conf.set(TezConfiguration.TEZ_LIB_URIS, appRemoteJar.toUri().toString()); + LOG.info("Set TEZ-LIB-URI to: " + conf.get(TezConfiguration.TEZ_LIB_URIS)); + + // VMEM monitoring disabled, PMEM monitoring enabled. + conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false); + conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false); + + conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000"); + + try { + Path stagingPath = FileContext.getFileContext(conf).makeQualified( + new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR))); + /* + * Re-configure the staging path on Windows if the file system is localFs. + * We need to use a absolute path that contains the drive letter. The unit + * test could run on a different drive than the AM. We can run into the + * issue that job files are localized to the drive where the test runs on, + * while the AM starts on a different drive and fails to find the job + * metafiles. Using absolute path can avoid this ambiguity. + */ + if (Path.WINDOWS) { + if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) { + conf.set(MRJobConfig.MR_AM_STAGING_DIR, + new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR)) + .getAbsolutePath()); + } + } + FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf); + if (fc.util().exists(stagingPath)) { + LOG.info(stagingPath + " exists! deleting..."); + fc.delete(stagingPath, true); + } + LOG.info("mkdir: " + stagingPath); + fc.mkdir(stagingPath, null, true); + + //mkdir done directory as well + String doneDir = + JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf); + Path doneDirPath = fc.makeQualified(new Path(doneDir)); + fc.mkdir(doneDirPath, null, true); + } catch (IOException e) { + throw new TezUncheckedException("Could not create staging directory. ", e); + } + conf.set(MRConfig.MASTER_ADDRESS, "test"); + + //configure the shuffle service in NM + conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, + new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID }); + conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, + ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class, + Service.class); + + // Non-standard shuffle port + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + + conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR, + DefaultContainerExecutor.class, ContainerExecutor.class); + + // TestMRJobs is for testing non-uberized operation only; see TestUberAM + // for corresponding uberized tests. + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + super.serviceInit(conf); + } + + @Override + public void serviceStart() throws Exception { + LOG.info("Starting MiniTezClusterWithTimeline"); + super.serviceStart(); + File workDir = super.getTestWorkDir(); + Configuration conf = super.getConfig(); + + confFilePath = new Path(workDir.getAbsolutePath(), YARN_CLUSTER_CONFIG); + File confFile = new File(confFilePath.toString()); + try { + confFile.createNewFile(); + conf.writeXml(new FileOutputStream(confFile)); + confFile.deleteOnExit(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + throw new RuntimeException(e); + } + confFilePath = new Path(confFile.getAbsolutePath()); + conf.setStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, + workDir.getAbsolutePath(), System.getProperty("java.class.path")); + LOG.info("Setting yarn-site.xml via YARN-APP-CP at: " + + conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH)); + } + + @Override + protected void serviceStop() throws Exception { + waitForAppsToFinish(); + super.serviceStop(); + } + + private void waitForAppsToFinish() { + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(getConfig()); + yarnClient.start(); + try { + while(true) { + List appReports = yarnClient.getApplications(); + Collection unCompletedApps = Collections2.filter(appReports, new Predicate(){ + @Override + public boolean apply(ApplicationReport appReport) { + return EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, + YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING) + .contains(appReport.getYarnApplicationState()); + } + }); + if (unCompletedApps.size()==0){ + break; + } + LOG.info("wait for applications to finish in MiniTezClusterWithTimeline"); + Thread.sleep(1000); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + yarnClient.stop(); + } + } + + public Path getConfigFilePath() { + return confFilePath; + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/20441ab0/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java deleted file mode 100644 index c68d395..0000000 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java +++ /dev/null @@ -1,325 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.history.logging.ats; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; -import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; -import org.apache.hadoop.yarn.client.api.TimelineClient; -import org.apache.tez.common.ReflectionUtils; -import org.apache.tez.common.security.HistoryACLPolicyManager; -import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.TezConstants; -import org.apache.tez.dag.api.TezUncheckedException; -import org.apache.tez.dag.history.DAGHistoryEvent; -import org.apache.tez.dag.history.HistoryEventType; -import org.apache.tez.dag.history.events.DAGSubmittedEvent; -import org.apache.tez.dag.history.logging.HistoryLoggingService; -import org.apache.tez.dag.records.TezDAGID; - -import com.google.common.annotations.VisibleForTesting; - -public class ATSHistoryLoggingService extends HistoryLoggingService { - - private static final Log LOG = LogFactory.getLog(ATSHistoryLoggingService.class); - - private LinkedBlockingQueue eventQueue = - new LinkedBlockingQueue(); - - private Thread eventHandlingThread; - private AtomicBoolean stopped = new AtomicBoolean(false); - private int eventCounter = 0; - private int eventsProcessed = 0; - private final Object lock = new Object(); - - @VisibleForTesting - TimelineClient timelineClient; - - private HashSet skippedDAGs = new HashSet(); - private Map dagDomainIdMap = new HashMap(); - private long maxTimeToWaitOnShutdown; - private boolean waitForeverOnShutdown = false; - - private int maxEventsPerBatch; - private long maxPollingTimeMillis; - - private String sessionDomainId; - private static final String atsHistoryACLManagerClassName = - "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager"; - private HistoryACLPolicyManager historyACLPolicyManager; - - public ATSHistoryLoggingService() { - super(ATSHistoryLoggingService.class.getName()); - } - - @Override - public void serviceInit(Configuration conf) throws Exception { - LOG.info("Initializing ATSService"); - timelineClient = TimelineClient.createTimelineClient(); - timelineClient.init(conf); - maxTimeToWaitOnShutdown = conf.getLong( - TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS, - TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS_DEFAULT); - maxEventsPerBatch = conf.getInt( - TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH, - TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH_DEFAULT); - maxPollingTimeMillis = conf.getInt( - TezConfiguration.YARN_ATS_MAX_POLLING_TIME_PER_EVENT, - TezConfiguration.YARN_ATS_MAX_POLLING_TIME_PER_EVENT_DEFAULT); - if (maxTimeToWaitOnShutdown < 0) { - waitForeverOnShutdown = true; - } - sessionDomainId = conf.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID); - - LOG.info("Using " + atsHistoryACLManagerClassName + " to manage Timeline ACLs"); - try { - historyACLPolicyManager = ReflectionUtils.createClazzInstance( - atsHistoryACLManagerClassName); - historyACLPolicyManager.setConf(conf); - } catch (TezUncheckedException e) { - LOG.warn("Could not instantiate object for " + atsHistoryACLManagerClassName - + ". ACLs cannot be enforced correctly for history data in Timeline", e); - if (!conf.getBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, - TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS_DEFAULT)) { - throw e; - } - historyACLPolicyManager = null; - } - - } - - @Override - public void serviceStart() { - LOG.info("Starting ATSService"); - timelineClient.start(); - - eventHandlingThread = new Thread(new Runnable() { - @Override - public void run() { - List events = new LinkedList(); - boolean interrupted = false; - while (!stopped.get() && !Thread.currentThread().isInterrupted() - && !interrupted) { - - // Log the size of the event-queue every so often. - if (eventCounter != 0 && eventCounter % 1000 == 0) { - if (eventsProcessed != 0 && !events.isEmpty()) { - LOG.info("Event queue stats" - + ", eventsProcessedSinceLastUpdate=" + eventsProcessed - + ", eventQueueSize=" + eventQueue.size()); - } - eventCounter = 0; - eventsProcessed = 0; - } else { - ++eventCounter; - } - - synchronized (lock) { - try { - getEventBatch(events); - } catch (InterruptedException e) { - // Finish processing events and then return - interrupted = true; - } - - if (events.isEmpty()) { - continue; - } - - eventsProcessed += events.size(); - try { - handleEvents(events); - } catch (Exception e) { - LOG.warn("Error handling events", e); - } - } - } - } - }, "HistoryEventHandlingThread"); - eventHandlingThread.start(); - } - - @Override - public void serviceStop() { - LOG.info("Stopping ATSService" - + ", eventQueueBacklog=" + eventQueue.size()); - stopped.set(true); - if (eventHandlingThread != null) { - eventHandlingThread.interrupt(); - } - synchronized (lock) { - if (!eventQueue.isEmpty()) { - LOG.warn("ATSService being stopped" - + ", eventQueueBacklog=" + eventQueue.size() - + ", maxTimeLeftToFlush=" + maxTimeToWaitOnShutdown - + ", waitForever=" + waitForeverOnShutdown); - long startTime = appContext.getClock().getTime(); - long endTime = startTime + maxTimeToWaitOnShutdown; - List events = new LinkedList(); - while (waitForeverOnShutdown || (endTime >= appContext.getClock().getTime())) { - try { - getEventBatch(events); - } catch (InterruptedException e) { - LOG.info("ATSService interrupted while shutting down. Exiting." - + " EventQueueBacklog=" + eventQueue.size()); - } - if (events.isEmpty()) { - LOG.info("Event queue empty, stopping ATS Service"); - break; - } - try { - handleEvents(events); - } catch (Exception e) { - LOG.warn("Error handling event", e); - break; - } - } - } - } - if (!eventQueue.isEmpty()) { - LOG.warn("Did not finish flushing eventQueue before stopping ATSService" - + ", eventQueueBacklog=" + eventQueue.size()); - } - timelineClient.stop(); - } - - private void getEventBatch(List events) throws InterruptedException { - events.clear(); - int counter = 0; - while (counter < maxEventsPerBatch) { - DAGHistoryEvent event = eventQueue.poll(maxPollingTimeMillis, TimeUnit.MILLISECONDS); - if (event == null) { - break; - } - if (!isValidEvent(event)) { - continue; - } - ++counter; - events.add(event); - if (event.getHistoryEvent().getEventType().equals(HistoryEventType.DAG_SUBMITTED)) { - // Special case this as it might be a large payload - break; - } - } - } - - - public void handle(DAGHistoryEvent event) { - eventQueue.add(event); - } - - private boolean isValidEvent(DAGHistoryEvent event) { - HistoryEventType eventType = event.getHistoryEvent().getEventType(); - TezDAGID dagId = event.getDagID(); - - if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) { - DAGSubmittedEvent dagSubmittedEvent = - (DAGSubmittedEvent) event.getHistoryEvent(); - String dagName = dagSubmittedEvent.getDAGName(); - if (dagName != null - && dagName.startsWith( - TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) { - // Skip recording pre-warm DAG events - skippedDAGs.add(dagId); - return false; - } - if (historyACLPolicyManager != null) { - String dagDomainId = dagSubmittedEvent.getConf().get( - TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID); - if (dagDomainId != null) { - dagDomainIdMap.put(dagId, dagDomainId); - } - } - } - if (eventType.equals(HistoryEventType.DAG_FINISHED)) { - // Remove from set to keep size small - // No more events should be seen after this point. - if (skippedDAGs.remove(dagId)) { - return false; - } - } - - if (dagId != null && skippedDAGs.contains(dagId)) { - // Skip pre-warm DAGs - return false; - } - - return true; - } - - private void handleEvents(List events) { - TimelineEntity[] entities = new TimelineEntity[events.size()]; - for (int i = 0; i < events.size(); ++i) { - DAGHistoryEvent event = events.get(i); - String domainId = sessionDomainId; - TezDAGID dagId = event.getDagID(); - - if (historyACLPolicyManager != null && dagId != null) { - if (dagDomainIdMap.containsKey(dagId)) { - domainId = dagDomainIdMap.get(dagId); - } - } - - entities[i] = HistoryEventTimelineConversion.convertToTimelineEntity(event.getHistoryEvent()); - if (historyACLPolicyManager != null) { - if (domainId != null && !domainId.isEmpty()) { - historyACLPolicyManager.updateTimelineEntityDomain(entities[i], domainId); - } - } - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Sending event batch to Timeline, batchSize=" + events.size()); - } - try { - TimelinePutResponse response = - timelineClient.putEntities(entities); - if (response != null - && !response.getErrors().isEmpty()) { - int count = response.getErrors().size(); - for (int i = 0; i < count; ++i) { - TimelinePutError err = response.getErrors().get(i); - if (err.getErrorCode() != 0) { - LOG.warn("Could not post history event to ATS" - + ", atsPutError=" + err.getErrorCode() - + ", entityId=" + entities[i].getEntityId() - + ", eventType=" + events.get(i).getHistoryEvent().getEventType()); - } - } - } - // Do nothing additional, ATS client library should handle throttling - // or auto-disable as needed - } catch (Exception e) { - LOG.warn("Could not handle history events", e); - } - } - -}