Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4A75317C2D for ; Tue, 28 Apr 2015 20:40:43 +0000 (UTC) Received: (qmail 94839 invoked by uid 500); 28 Apr 2015 20:40:43 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 94773 invoked by uid 500); 28 Apr 2015 20:40:43 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 94399 invoked by uid 99); 28 Apr 2015 20:40:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Apr 2015 20:40:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C9DB7E0914; Tue, 28 Apr 2015 20:40:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Tue, 28 Apr 2015 20:40:56 -0000 Message-Id: <8287bef74a194dfd904c97e9b9abe919@git.apache.org> In-Reply-To: <3aafb2e5bec84d3a83b6d778ccee3724@git.apache.org> References: <3aafb2e5bec84d3a83b6d778ccee3724@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [15/37] tez git commit: TEZ-2226. Disable writing history to timeline if domain creation fails. (Chang Li via hitesh) TEZ-2226. Disable writing history to timeline if domain creation fails. (Chang Li via hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5b2f011f Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5b2f011f Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5b2f011f Branch: refs/heads/TEZ-2003 Commit: 5b2f011f105d78238a3829218299e39594dc1560 Parents: ecf8c43 Author: Hitesh Shah Authored: Tue Apr 28 09:39:17 2015 -0700 Committer: Hitesh Shah Committed: Tue Apr 28 09:39:17 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../java/org/apache/tez/client/TezClient.java | 29 +- .../org/apache/tez/client/TezClientUtils.java | 31 +- .../security/HistoryACLPolicyException.java | 34 ++ .../security/HistoryACLPolicyManager.java | 7 +- .../main/java/org/apache/tez/dag/api/DAG.java | 6 + .../apache/tez/dag/api/TezConfiguration.java | 20 +- .../org/apache/tez/dag/app/DAGAppMaster.java | 8 + .../dag/history/events/DAGRecoveredEvent.java | 10 + .../dag/history/events/DAGSubmittedEvent.java | 9 +- .../ats/acls/ATSHistoryACLPolicyManager.java | 15 +- .../ats/acls/TestATSHistoryWithACLs.java | 339 ++++++++++++++++++- .../logging/ats/ATSHistoryLoggingService.java | 29 +- 13 files changed, 509 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/5b2f011f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9ea191d..e676cde 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -151,6 +151,7 @@ Release 0.6.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2226. Disable writing history to timeline if domain creation fails. TEZ-2259. Push additional data to Timeline for Recovery for better consumption in UI. TEZ-2365. Update tez-ui war's license/notice to reflect OFL license correctly. TEZ-2329. UI Query on final dag status performance improvement http://git-wip-us.apache.org/repos/asf/tez/blob/5b2f011f/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index bf157d1..4f57d5e 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -61,6 +61,7 @@ import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequest import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGResponseProto; import org.apache.tez.dag.api.client.DAGClientImpl; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; +import org.apache.tez.common.security.HistoryACLPolicyException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -286,6 +287,12 @@ public class TezClient { amConfig.setCredentials(credentials); } + @Private + @VisibleForTesting + public synchronized void setUpHistoryAclManager(HistoryACLPolicyManager myAclPolicyManager) { + historyACLPolicyManager = myAclPolicyManager; + } + /** * Start the client. This establishes a connection to the YARN cluster. * In session mode, this start the App Master thats runs all the DAGs in the @@ -300,9 +307,11 @@ public class TezClient { frameworkClient.init(amConfig.getTezConfiguration(), amConfig.getYarnConfiguration()); frameworkClient.start(); + ///need additional check for historyACLPolicyManager because tests could stub historyACLPolicyManager + ///before tezclient start. If there is already a stubbed historyACLPolicyManager, we don't overwrite it if (this.amConfig.getTezConfiguration().get( TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, "") - .equals(atsHistoryLoggingServiceClassName)) { + .equals(atsHistoryLoggingServiceClassName) && (historyACLPolicyManager == null)) { LOG.info("Using " + atsHistoryACLManagerClassName + " to manage Timeline ACLs"); try { historyACLPolicyManager = ReflectionUtils.createClazzInstance( @@ -406,9 +415,21 @@ public class TezClient { } Map aclConfigs = null; - if (historyACLPolicyManager != null) { - aclConfigs = historyACLPolicyManager.setupSessionDAGACLs( - amConfig.getTezConfiguration(), sessionAppId, dag.getName(), dag.getDagAccessControls()); + // TEZ_AM_HISTORY_LOGGING_ENABLED is a config setting enable/disable logging of all dags within a session + boolean sessionHistoryLoggingEnabled = amConfig.getTezConfiguration().getBoolean( + TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED, + TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT); + if (historyACLPolicyManager != null && sessionHistoryLoggingEnabled) { + try { + aclConfigs = historyACLPolicyManager.setupSessionDAGACLs( + amConfig.getTezConfiguration(), sessionAppId, dag.getName(), dag.getDagAccessControls()); + } catch (HistoryACLPolicyException e) { + LOG.warn("Disabling history logging for dag " + + dag.getName() + " due to error in setting up history acls " + e); + dag.setConf(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, "false"); + } + } else if (!sessionHistoryLoggingEnabled) { + dag.setConf(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, "false"); } Map tezJarResources = getTezJarResources(sessionCredentials); http://git-wip-us.apache.org/repos/asf/tez/blob/5b2f011f/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java index 86ba0f9..8f5b48e 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java @@ -82,6 +82,7 @@ import org.apache.tez.common.TezYARNUtils; import org.apache.tez.common.VersionInfo; import org.apache.tez.common.security.ACLManager; import org.apache.tez.common.security.HistoryACLPolicyManager; +import org.apache.tez.common.security.HistoryACLPolicyException; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.common.security.TokenCache; @@ -520,14 +521,30 @@ public class TezClientUtils { Map aclConfigs = null; if (historyACLPolicyManager != null) { if (dag == null) { - aclConfigs = historyACLPolicyManager.setupSessionACLs(amConfig.getTezConfiguration(), - appId); + try{ + aclConfigs = historyACLPolicyManager.setupSessionACLs(amConfig.getTezConfiguration(), + appId); + } catch (HistoryACLPolicyException e) { + LOG.warn("Disabling history logging for session " + strAppId + + " due to error in setting up history acls " + e); + amConfig.getTezConfiguration().setBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED, + false); + } } else { - // Non-session mode - // As only a single DAG is support, we should combine AM and DAG ACLs under the same - // acl management layer - aclConfigs = historyACLPolicyManager.setupNonSessionACLs(amConfig.getTezConfiguration(), - appId, dag.getDagAccessControls()); + try{ + // Non-session mode + // As only a single DAG is support, we should combine AM and DAG ACLs under the same + // acl management layer + aclConfigs = historyACLPolicyManager.setupNonSessionACLs(amConfig.getTezConfiguration(), + appId, dag.getDagAccessControls()); + } catch (HistoryACLPolicyException e) { + LOG.warn("Disabling history logging for dag " + + dag.getName() + " due to error in setting up history acls " + e); + dag.setConf(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, "false"); + // This is non-session mode so disable logging for whole AM + amConfig.getTezConfiguration().setBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED, + false); + } } } http://git-wip-us.apache.org/repos/asf/tez/blob/5b2f011f/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyException.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyException.java b/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyException.java new file mode 100644 index 0000000..80229bc --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyException.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.common.security; + +import org.apache.hadoop.classification.InterfaceAudience.Public; + +/** + * HistoryACLPolicyException Exception thrown upon error within HistoryACLPolicyManager + */ +@Public +public class HistoryACLPolicyException extends Exception { + private static final long serialVersionUID = 5337242734701961239L; + public HistoryACLPolicyException(Throwable cause) { super(cause); } + public HistoryACLPolicyException(String message) { super(message); } + public HistoryACLPolicyException(String message, Throwable cause) { + super(message, cause); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/5b2f011f/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java b/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java index a3b62ec..dea89cc 100644 --- a/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java +++ b/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java @@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.common.security.HistoryACLPolicyException; /** * ACL Policy Manager @@ -43,7 +44,7 @@ public interface HistoryACLPolicyManager extends Configurable { * @throws Exception */ public Map setupSessionACLs(Configuration conf, ApplicationId applicationId) - throws IOException; + throws IOException, HistoryACLPolicyException; /** * Take any necessary steps for setting up ACLs for an AM which is running in non-session mode @@ -53,7 +54,7 @@ public interface HistoryACLPolicyManager extends Configurable { * @throws Exception */ public Map setupNonSessionACLs(Configuration conf, ApplicationId applicationId, - DAGAccessControls dagAccessControls) throws IOException; + DAGAccessControls dagAccessControls) throws IOException, HistoryACLPolicyException; /** * Take any necessary steps for setting up ACLs for a DAG that is submitted to a Session @@ -63,7 +64,7 @@ public interface HistoryACLPolicyManager extends Configurable { * @throws Exception */ public Map setupSessionDAGACLs(Configuration conf, ApplicationId applicationId, - String dagName, DAGAccessControls dagAccessControls) throws IOException; + String dagName, DAGAccessControls dagAccessControls) throws IOException, HistoryACLPolicyException; public void updateTimelineEntityDomain(Object timelineEntity, String domainId); http://git-wip-us.apache.org/repos/asf/tez/blob/5b2f011f/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java index 4dffd4d..79e8759 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java @@ -331,6 +331,12 @@ public class DAG { } @Private + @VisibleForTesting + public Map getDagConf() { + return dagConf; + } + + @Private public Map getTaskLocalFiles() { return commonTaskLocalFiles; } http://git-wip-us.apache.org/repos/asf/tez/blob/5b2f011f/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 50b3e00..14e773d 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -207,6 +207,15 @@ public class TezConfiguration extends Configuration { public static final boolean TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS_DEFAULT = true; /** + * Boolean value. Determine whether to log history events per dag + */ + @ConfigurationScope(Scope.DAG) + @Private + public static final String TEZ_DAG_HISTORY_LOGGING_ENABLED = + TEZ_PREFIX + "dag.history.logging.enabled"; + public static final boolean TEZ_DAG_HISTORY_LOGGING_ENABLED_DEFAULT = true; + + /** * String value. Command line options which will be prepended to {@link #TEZ_AM_LAUNCH_CMD_OPTS} * during the launch of the AppMaster process. This property will typically be configured to * include default options meant to be used by all jobs in a cluster. If required, the values can @@ -711,7 +720,16 @@ public class TezConfiguration extends Configuration { public static final String TEZ_AM_SESSION_MIN_HELD_CONTAINERS = TEZ_AM_PREFIX + "session.min.held-containers"; public static final int TEZ_AM_SESSION_MIN_HELD_CONTAINERS_DEFAULT = 0; - + + /** + * Boolean value. Allow/disable logging for all dags in a session + */ + @Private + @ConfigurationScope(Scope.AM) + public static final String TEZ_AM_HISTORY_LOGGING_ENABLED = + TEZ_AM_PREFIX + "history.logging.enabled"; + public static final boolean TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT = true; + /** * Int value. Specifies the percentage of tasks eligible to be preempted that * will actually be preempted in a given round of Tez internal preemption. http://git-wip-us.apache.org/repos/asf/tez/blob/5b2f011f/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 90935ac..27b9c37 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -1741,6 +1741,10 @@ public class DAGAppMaster extends AbstractService { recoveredDAGData.recoveredDAG.getID(), recoveredDAGData.recoveredDAG.getName(), recoveredDAGData.recoveredDAG.getUserName(), this.clock.getTime(), DAGState.FAILED, recoveredDAGData.reason); + dagRecoveredEvent.setHistoryLoggingEnabled( + recoveredDAGData.recoveredDAG.getConf().getBoolean( + TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, + TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED_DEFAULT)); this.historyEventHandler.handle(new DAGHistoryEvent(recoveredDAGData.recoveredDAG.getID(), dagRecoveredEvent)); dagEventDispatcher.handle(recoverDAGEvent); @@ -2122,6 +2126,10 @@ public class DAGAppMaster extends AbstractService { DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(newDAG.getID(), submitTime, dagPlan, this.appAttemptID, cumulativeAdditionalResources, newDAG.getUserName(), newDAG.getConf()); + boolean dagLoggingEnabled = newDAG.getConf().getBoolean( + TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, + TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED_DEFAULT); + submittedEvent.setHistoryLoggingEnabled(dagLoggingEnabled); try { historyEventHandler.handleCriticalEvent( new DAGHistoryEvent(newDAG.getID(), submittedEvent)); http://git-wip-us.apache.org/repos/asf/tez/blob/5b2f011f/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java index 5b44de2..7109756 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java @@ -38,6 +38,8 @@ public class DAGRecoveredEvent implements HistoryEvent { private final String dagName; private final String user; + private boolean historyLoggingEnabled = true; + public DAGRecoveredEvent(ApplicationAttemptId applicationAttemptId, TezDAGID dagId, String dagName, String user, long recoveredTime, DAGState recoveredState, @@ -111,6 +113,14 @@ public class DAGRecoveredEvent implements HistoryEvent { return user; } + public boolean isHistoryLoggingEnabled() { + return historyLoggingEnabled; + } + + public void setHistoryLoggingEnabled(boolean historyLoggingEnabled) { + this.historyLoggingEnabled = historyLoggingEnabled; + } + @Override public String toString() { return "applicationAttemptId=" http://git-wip-us.apache.org/repos/asf/tez/blob/5b2f011f/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java index f5d58a9..978fd0c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java @@ -36,7 +36,6 @@ import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.history.SummaryEvent; import org.apache.tez.dag.records.TezDAGID; -import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGSubmittedProto; import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto; import org.apache.tez.dag.utils.ProtoUtils; @@ -55,6 +54,7 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent { private ApplicationAttemptId applicationAttemptId; private String user; private Map cumulativeAdditionalLocalResources; + private boolean historyLoggingEnabled = true; private Configuration conf; @@ -191,4 +191,11 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent { return conf; } + public void setHistoryLoggingEnabled(boolean loggingEnabled) { + historyLoggingEnabled = loggingEnabled; + } + + public boolean isHistoryLoggingEnabled() { + return historyLoggingEnabled; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/5b2f011f/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java index 23d3558..3fa3db6 100644 --- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java +++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java @@ -38,6 +38,7 @@ import org.apache.tez.common.security.ACLManager; import org.apache.tez.common.security.ACLType; import org.apache.tez.common.security.DAGAccessControls; import org.apache.tez.common.security.HistoryACLPolicyManager; +import org.apache.tez.common.security.HistoryACLPolicyException; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; @@ -115,7 +116,7 @@ public class ATSHistoryACLPolicyManager implements HistoryACLPolicyManager { } private void createTimelineDomain(String domainId, Configuration tezConf, - DAGAccessControls dagAccessControls) throws IOException { + DAGAccessControls dagAccessControls) throws IOException, HistoryACLPolicyException { TimelineDomain timelineDomain = new TimelineDomain(); timelineDomain.setId(domainId); @@ -129,13 +130,15 @@ public class ATSHistoryACLPolicyManager implements HistoryACLPolicyManager { } } catch (Exception e) { LOG.warn("Could not post timeline domain", e); + throw new + HistoryACLPolicyException("Fail to create ACL-related domain in Timeline", e); } } private Map createSessionDomain(Configuration tezConf, ApplicationId applicationId, DAGAccessControls dagAccessControls) - throws IOException { + throws IOException, HistoryACLPolicyException { String domainId = tezConf.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID); if (!tezConf.getBoolean(TezConfiguration.TEZ_AM_ACLS_ENABLED, @@ -169,7 +172,7 @@ public class ATSHistoryACLPolicyManager implements HistoryACLPolicyManager { private Map createDAGDomain(Configuration tezConf, ApplicationId applicationId, String dagName, DAGAccessControls dagAccessControls) - throws IOException { + throws IOException, HistoryACLPolicyException { if (dagAccessControls == null) { // No DAG specific ACLs return null; @@ -221,19 +224,19 @@ public class ATSHistoryACLPolicyManager implements HistoryACLPolicyManager { @Override public Map setupSessionACLs(Configuration conf, ApplicationId applicationId) - throws IOException { + throws IOException, HistoryACLPolicyException { return createSessionDomain(conf, applicationId, null); } @Override public Map setupNonSessionACLs(Configuration conf, ApplicationId applicationId, - DAGAccessControls dagAccessControls) throws IOException { + DAGAccessControls dagAccessControls) throws IOException, HistoryACLPolicyException { return createSessionDomain(conf, applicationId, dagAccessControls); } @Override public Map setupSessionDAGACLs(Configuration conf, ApplicationId applicationId, - String dagName, DAGAccessControls dagAccessControls) throws IOException { + String dagName, DAGAccessControls dagAccessControls) throws IOException, HistoryACLPolicyException { return createDAGDomain(conf, applicationId, dagName, dagAccessControls); } http://git-wip-us.apache.org/repos/asf/tez/blob/5b2f011f/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java index c03ce27..fc35971 100644 --- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java +++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.tez.client.TezClient; import org.apache.tez.common.ReflectionUtils; @@ -51,8 +52,13 @@ import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; -import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService; +import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.history.events.DAGSubmittedEvent; +import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService; +import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.runtime.library.processor.SleepProcessor; import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig; import org.apache.tez.tests.MiniTezClusterWithTimeline; @@ -66,6 +72,11 @@ import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.WebResource; +import org.mockito.Matchers; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.doThrow; + public class TestATSHistoryWithACLs { private static final Logger LOG = LoggerFactory.getLogger(TestATSHistoryWithACLs.class); @@ -297,6 +308,332 @@ public class TestATSHistoryWithACLs { verifyEntityDomains(applicationId, false); } + /** + * test failure of domain creation during dag submittion in session mode + * only affect logging for that dag not following submitted dag + * @throws Exception + */ + @Test (timeout=50000) + public void testMultipleDagSession() throws Exception { + TezClient tezSession = null; + String viewAcls = "nobody nobody_group"; + SleepProcessorConfig spConf = new SleepProcessorConfig(1); + + DAG dag = DAG.create("TezSleepProcessor"); + Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create( + SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1, + Resource.newInstance(256, 1)); + dag.addVertex(vertex); + DAGAccessControls accessControls = new DAGAccessControls(); + accessControls.setUsersWithViewACLs(Collections.singleton("nobody2")); + accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group2")); + dag.setAccessControls(accessControls); + + TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); + tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls); + tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, + ATSHistoryLoggingService.class.getName()); + Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random + .nextInt(100000)))); + remoteFs.mkdirs(remoteStagingDir); + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString()); + + tezSession = TezClient.create("TezSleepProcessor", tezConf, true); + tezSession.start(); + + //////submit first dag which fails in dag creation////// + ATSHistoryACLPolicyManager myAclPolicyManager = ReflectionUtils.createClazzInstance( + atsHistoryACLManagerClassName); + myAclPolicyManager.timelineClient = mock(TimelineClient.class); + + doThrow(new IOException("Fail to Put Domain")).when(myAclPolicyManager.timelineClient).putDomain(Matchers.anyVararg()); + tezSession.setUpHistoryAclManager(myAclPolicyManager); + + DAGClient dagClient = tezSession.submitDAG(dag); + DAGStatus dagStatus = dagClient.getDAGStatus(null); + while (!dagStatus.isCompleted()) { + LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: " + + dagStatus.getState()); + Thread.sleep(500l); + dagStatus = dagClient.getDAGStatus(null); + } + assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState()); + String dagLogging = dag.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED); + assertEquals(dagLogging, "false"); + + myAclPolicyManager.timelineClient = null; + myAclPolicyManager.setConf(tezConf); + tezSession.setUpHistoryAclManager(myAclPolicyManager); + + //////submit second dag which succeeds in dag creation////// + DAG dag2 = DAG.create("TezSleepProcessor2"); + vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create( + SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1, + Resource.newInstance(256, 1)); + dag2.addVertex(vertex); + accessControls = new DAGAccessControls(); + accessControls.setUsersWithViewACLs(Collections.singleton("nobody3")); + accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group3")); + dag2.setAccessControls(accessControls); + dagClient = tezSession.submitDAG(dag2); + dagStatus = dagClient.getDAGStatus(null); + while (!dagStatus.isCompleted()) { + LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: " + + dagStatus.getState()); + Thread.sleep(500l); + dagStatus = dagClient.getDAGStatus(null); + } + dagLogging = dag2.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED); + Assert.assertNull(dagLogging); + tezSession.stop(); + } + +/** + * test failure of domain creation during dag submittion in nonsession mode + * only affect logging for that dag not following submitted dag + * @throws Exception + */ + @Test (timeout=50000) + public void testMultipleDagNonSession() throws Exception { + TezClient tezClient = null; + String viewAcls = "nobody nobody_group"; + SleepProcessorConfig spConf = new SleepProcessorConfig(1); + + DAG dag = DAG.create("TezSleepProcessor"); + Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create( + SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1, + Resource.newInstance(256, 1)); + dag.addVertex(vertex); + DAGAccessControls accessControls = new DAGAccessControls(); + accessControls.setUsersWithViewACLs(Collections.singleton("nobody2")); + accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group2")); + dag.setAccessControls(accessControls); + + TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); + tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls); + tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, + ATSHistoryLoggingService.class.getName()); + Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random + .nextInt(100000)))); + remoteFs.mkdirs(remoteStagingDir); + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString()); + + tezClient = TezClient.create("TezSleepProcessor", tezConf, false); + tezClient.start(); + + //////submit first dag which fails in dag creation////// + ATSHistoryACLPolicyManager myAclPolicyManager = ReflectionUtils.createClazzInstance( + atsHistoryACLManagerClassName); + myAclPolicyManager.timelineClient = mock(TimelineClient.class); + + doThrow(new IOException("Fail to Put Domain")).when(myAclPolicyManager.timelineClient).putDomain(Matchers.anyVararg()); + tezClient.setUpHistoryAclManager(myAclPolicyManager); + + DAGClient dagClient = tezClient.submitDAG(dag); + DAGStatus dagStatus = dagClient.getDAGStatus(null); + while (!dagStatus.isCompleted()) { + LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: " + + dagStatus.getState()); + Thread.sleep(500l); + dagStatus = dagClient.getDAGStatus(null); + } + assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState()); + String dagLogging = dag.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED); + assertEquals(dagLogging, "false"); + + myAclPolicyManager.timelineClient = null; + myAclPolicyManager.setConf(tezConf); + tezClient.setUpHistoryAclManager(myAclPolicyManager); + + //////submit second dag which succeeds in dag creation////// + DAG dag2 = DAG.create("TezSleepProcessor2"); + vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create( + SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1, + Resource.newInstance(256, 1)); + dag2.addVertex(vertex); + accessControls = new DAGAccessControls(); + accessControls.setUsersWithViewACLs(Collections.singleton("nobody3")); + accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group3")); + dag2.setAccessControls(accessControls); + dagClient = tezClient.submitDAG(dag2); + dagStatus = dagClient.getDAGStatus(null); + while (!dagStatus.isCompleted()) { + LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: " + + dagStatus.getState()); + Thread.sleep(500l); + dagStatus = dagClient.getDAGStatus(null); + } + dagLogging = dag2.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED); + Assert.assertNull(dagLogging); + tezClient.stop(); + } + /** + * Test Disable Logging for all dags in a session + * due to failure to create domain in session start + * @throws Exception + */ + @Test (timeout=50000) + public void testDisableSessionLogging() throws Exception { + TezClient tezSession = null; + String viewAcls = "nobody nobody_group"; + SleepProcessorConfig spConf = new SleepProcessorConfig(1); + + DAG dag = DAG.create("TezSleepProcessor"); + Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create( + SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1, + Resource.newInstance(256, 1)); + dag.addVertex(vertex); + DAGAccessControls accessControls = new DAGAccessControls(); + accessControls.setUsersWithViewACLs(Collections.singleton("nobody2")); + accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group2")); + dag.setAccessControls(accessControls); + + TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); + tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls); + tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, + ATSHistoryLoggingService.class.getName()); + Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random + .nextInt(100000)))); + remoteFs.mkdirs(remoteStagingDir); + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString()); + + tezSession = TezClient.create("TezSleepProcessor", tezConf, true); + ATSHistoryACLPolicyManager myAclPolicyManager = ReflectionUtils.createClazzInstance( + atsHistoryACLManagerClassName); + myAclPolicyManager.timelineClient = mock(TimelineClient.class); + + doThrow(new IOException("Fail to Put Domain")). + when(myAclPolicyManager.timelineClient).putDomain(Matchers.anyVararg()); + tezSession.setUpHistoryAclManager(myAclPolicyManager); + tezSession.start(); + + ///substitute back mocked timelineClient with a normal one + myAclPolicyManager.timelineClient = null; + myAclPolicyManager.setConf(tezConf); + tezSession.setUpHistoryAclManager(myAclPolicyManager); + //////submit first dag ////// + DAGClient dagClient = tezSession.submitDAG(dag); + DAGStatus dagStatus = dagClient.getDAGStatus(null); + while (!dagStatus.isCompleted()) { + LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: " + + dagStatus.getState()); + Thread.sleep(500l); + dagStatus = dagClient.getDAGStatus(null); + } + assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState()); + String dagLogging = dag.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED); + assertEquals(dagLogging, "false"); + + //////submit second dag////// + DAG dag2 = DAG.create("TezSleepProcessor2"); + vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create( + SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1, + Resource.newInstance(256, 1)); + dag2.addVertex(vertex); + accessControls = new DAGAccessControls(); + accessControls.setUsersWithViewACLs(Collections.singleton("nobody3")); + accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group3")); + dag2.setAccessControls(accessControls); + dagClient = tezSession.submitDAG(dag2); + dagStatus = dagClient.getDAGStatus(null); + while (!dagStatus.isCompleted()) { + LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: " + + dagStatus.getState()); + Thread.sleep(500l); + dagStatus = dagClient.getDAGStatus(null); + } + dagLogging = dag2.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED); + assertEquals(dagLogging, "false"); + tezSession.stop(); + } + /** + * use mini cluster to verify data do not push to ats when the daglogging flag + * in dagsubmittedevent is set off + * @throws Exception + */ + @Test (timeout=50000) + public void testDagLoggingDisabled() throws Exception { + ATSHistoryLoggingService historyLoggingService; + historyLoggingService = + ReflectionUtils.createClazzInstance(ATSHistoryLoggingService.class.getName()); + TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); + String viewAcls = "nobody nobody_group"; + tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls); + tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, + ATSHistoryLoggingService.class.getName()); + Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random + .nextInt(100000)))); + remoteFs.mkdirs(remoteStagingDir); + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString()); + historyLoggingService.serviceInit(tezConf); + historyLoggingService.serviceStart(); + ApplicationId appId = ApplicationId.newInstance(100l, 1); + TezDAGID tezDAGID = TezDAGID.getInstance( + appId, 100); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + DAGPlan dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build(); + DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(tezDAGID, + 1, dagPlan, appAttemptId, null, + "usr", tezConf); + submittedEvent.setHistoryLoggingEnabled(false); + DAGHistoryEvent event = new DAGHistoryEvent(tezDAGID, submittedEvent); + historyLoggingService.handle(new DAGHistoryEvent(tezDAGID, submittedEvent)); + Thread.sleep(1000l); + String url = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_DAG_ID/"+event.getDagID(); + Client client = new Client(); + WebResource resource = client.resource(url); + + ClientResponse response = resource.accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + assertEquals(404, response.getStatus()); + } + + /** + * use mini cluster to verify data do push to ats when + * the dag logging flag in dagsubmitted event is set on + * @throws Exception + */ + @Test (timeout=50000) + public void testDagLoggingEnabled() throws Exception { + ATSHistoryLoggingService historyLoggingService; + historyLoggingService = + ReflectionUtils.createClazzInstance(ATSHistoryLoggingService.class.getName()); + TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); + String viewAcls = "nobody nobody_group"; + tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls); + tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, + ATSHistoryLoggingService.class.getName()); + Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random + .nextInt(100000)))); + remoteFs.mkdirs(remoteStagingDir); + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString()); + historyLoggingService.serviceInit(tezConf); + historyLoggingService.serviceStart(); + ApplicationId appId = ApplicationId.newInstance(100l, 1); + TezDAGID tezDAGID = TezDAGID.getInstance( + appId, 11); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + DAGPlan dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build(); + DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(tezDAGID, + 1, dagPlan, appAttemptId, null, + "usr", tezConf); + submittedEvent.setHistoryLoggingEnabled(true); + DAGHistoryEvent event = new DAGHistoryEvent(tezDAGID, submittedEvent); + historyLoggingService.handle(new DAGHistoryEvent(tezDAGID, submittedEvent)); + Thread.sleep(1000l); + String url = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_DAG_ID/"+event.getDagID(); + Client client = new Client(); + WebResource resource = client.resource(url); + + ClientResponse response = resource.accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + assertEquals(200, response.getStatus()); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + TimelineEntity entity = response.getEntity(TimelineEntity.class); + assertEquals(entity.getEntityType(), "TEZ_DAG_ID"); + assertEquals(entity.getEvents().get(0).getEventType(), HistoryEventType.DAG_SUBMITTED.toString()); + } + private static final String atsHistoryACLManagerClassName = "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager"; @Test (timeout=50000) http://git-wip-us.apache.org/repos/asf/tez/blob/5b2f011f/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java index 0abde11..9a2d77e 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java @@ -27,6 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.tez.dag.history.events.DAGRecoveredEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -60,6 +61,7 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { private int eventCounter = 0; private int eventsProcessed = 0; private final Object lock = new Object(); + private boolean historyLoggingEnabled = true; @VisibleForTesting TimelineClient timelineClient; @@ -85,7 +87,15 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { @Override public void serviceInit(Configuration conf) throws Exception { + historyLoggingEnabled = conf.getBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED, + TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT); + if (!historyLoggingEnabled) { + LOG.info("ATSService: History Logging disabled. " + + TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED + " set to false"); + return; + } LOG.info("Initializing ATSService"); + if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { timelineClient = TimelineClient.createTimelineClient(); @@ -132,7 +142,7 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { @Override public void serviceStart() { - if (timelineClient == null) { + if (!historyLoggingEnabled || timelineClient == null) { return; } LOG.info("Starting ATSService"); @@ -186,7 +196,7 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { @Override public void serviceStop() { - if (timelineClient == null) { + if (!historyLoggingEnabled || timelineClient == null) { return; } LOG.info("Stopping ATSService" @@ -253,7 +263,7 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { public void handle(DAGHistoryEvent event) { - if (timelineClient != null) { + if (historyLoggingEnabled && timelineClient != null) { eventQueue.add(event); } } @@ -266,9 +276,9 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { DAGSubmittedEvent dagSubmittedEvent = (DAGSubmittedEvent) event.getHistoryEvent(); String dagName = dagSubmittedEvent.getDAGName(); - if (dagName != null - && dagName.startsWith( - TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) { + if ((dagName != null + && dagName.startsWith(TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) + || (!dagSubmittedEvent.isHistoryLoggingEnabled())) { // Skip recording pre-warm DAG events skippedDAGs.add(dagId); return false; @@ -281,6 +291,13 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { } } } + if (eventType.equals(HistoryEventType.DAG_RECOVERED)) { + DAGRecoveredEvent dagRecoveredEvent = (DAGRecoveredEvent) event.getHistoryEvent(); + if (!dagRecoveredEvent.isHistoryLoggingEnabled()) { + skippedDAGs.add(dagRecoveredEvent.getDagID()); + return false; + } + } if (eventType.equals(HistoryEventType.DAG_FINISHED)) { // Remove from set to keep size small // No more events should be seen after this point.