tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [15/37] tez git commit: TEZ-2226. Disable writing history to timeline if domain creation fails. (Chang Li via hitesh)
Date Tue, 28 Apr 2015 20:40:56 GMT
TEZ-2226. Disable writing history to timeline if domain creation fails. (Chang Li via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5b2f011f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5b2f011f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5b2f011f

Branch: refs/heads/TEZ-2003
Commit: 5b2f011f105d78238a3829218299e39594dc1560
Parents: ecf8c43
Author: Hitesh Shah <hitesh@apache.org>
Authored: Tue Apr 28 09:39:17 2015 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Tue Apr 28 09:39:17 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../java/org/apache/tez/client/TezClient.java   |  29 +-
 .../org/apache/tez/client/TezClientUtils.java   |  31 +-
 .../security/HistoryACLPolicyException.java     |  34 ++
 .../security/HistoryACLPolicyManager.java       |   7 +-
 .../main/java/org/apache/tez/dag/api/DAG.java   |   6 +
 .../apache/tez/dag/api/TezConfiguration.java    |  20 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |   8 +
 .../dag/history/events/DAGRecoveredEvent.java   |  10 +
 .../dag/history/events/DAGSubmittedEvent.java   |   9 +-
 .../ats/acls/ATSHistoryACLPolicyManager.java    |  15 +-
 .../ats/acls/TestATSHistoryWithACLs.java        | 339 ++++++++++++++++++-
 .../logging/ats/ATSHistoryLoggingService.java   |  29 +-
 13 files changed, 509 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/5b2f011f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9ea191d..e676cde 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -151,6 +151,7 @@ Release 0.6.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2226. Disable writing history to timeline if domain creation fails.
   TEZ-2259. Push additional data to Timeline for Recovery for better consumption in UI.
   TEZ-2365. Update tez-ui war's license/notice to reflect OFL license correctly.
   TEZ-2329. UI Query on final dag status performance improvement

http://git-wip-us.apache.org/repos/asf/tez/blob/5b2f011f/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index bf157d1..4f57d5e 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -61,6 +61,7 @@ import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequest
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGResponseProto;
 import org.apache.tez.dag.api.client.DAGClientImpl;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.common.security.HistoryACLPolicyException;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -286,6 +287,12 @@ public class TezClient {
     amConfig.setCredentials(credentials);
   }
   
+  @Private
+  @VisibleForTesting
+  public synchronized void setUpHistoryAclManager(HistoryACLPolicyManager myAclPolicyManager)
{
+    historyACLPolicyManager =  myAclPolicyManager;
+  }
+
   /**
    * Start the client. This establishes a connection to the YARN cluster.
    * In session mode, this start the App Master thats runs all the DAGs in the
@@ -300,9 +307,11 @@ public class TezClient {
     frameworkClient.init(amConfig.getTezConfiguration(), amConfig.getYarnConfiguration());
     frameworkClient.start();
 
+    ///need additional check for historyACLPolicyManager because tests could stub historyACLPolicyManager
+    ///before tezclient start. If there is already a stubbed historyACLPolicyManager, we
don't overwrite it
     if (this.amConfig.getTezConfiguration().get(
         TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, "")
-        .equals(atsHistoryLoggingServiceClassName)) {
+        .equals(atsHistoryLoggingServiceClassName) && (historyACLPolicyManager ==
null)) {
       LOG.info("Using " + atsHistoryACLManagerClassName + " to manage Timeline ACLs");
       try {
         historyACLPolicyManager = ReflectionUtils.createClazzInstance(
@@ -406,9 +415,21 @@ public class TezClient {
     }
 
     Map<String, String> aclConfigs = null;
-    if (historyACLPolicyManager != null) {
-      aclConfigs = historyACLPolicyManager.setupSessionDAGACLs(
-          amConfig.getTezConfiguration(), sessionAppId, dag.getName(), dag.getDagAccessControls());
+    // TEZ_AM_HISTORY_LOGGING_ENABLED is a config setting enable/disable logging of all dags
within a session
+    boolean sessionHistoryLoggingEnabled = amConfig.getTezConfiguration().getBoolean(
+        TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED,
+        TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT);
+    if (historyACLPolicyManager != null && sessionHistoryLoggingEnabled) {
+      try {
+        aclConfigs = historyACLPolicyManager.setupSessionDAGACLs(
+            amConfig.getTezConfiguration(), sessionAppId, dag.getName(), dag.getDagAccessControls());
+      } catch (HistoryACLPolicyException e) {
+        LOG.warn("Disabling history logging for dag " +
+          dag.getName() + " due to error in setting up history acls " + e);
+        dag.setConf(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, "false");
+      }
+    } else if (!sessionHistoryLoggingEnabled) {
+      dag.setConf(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, "false");
     }
 
     Map<String, LocalResource> tezJarResources = getTezJarResources(sessionCredentials);

http://git-wip-us.apache.org/repos/asf/tez/blob/5b2f011f/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 86ba0f9..8f5b48e 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -82,6 +82,7 @@ import org.apache.tez.common.TezYARNUtils;
 import org.apache.tez.common.VersionInfo;
 import org.apache.tez.common.security.ACLManager;
 import org.apache.tez.common.security.HistoryACLPolicyManager;
+import org.apache.tez.common.security.HistoryACLPolicyException;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.common.security.TokenCache;
@@ -520,14 +521,30 @@ public class TezClientUtils {
     Map<String, String> aclConfigs = null;
     if (historyACLPolicyManager != null) {
       if (dag == null) {
-        aclConfigs = historyACLPolicyManager.setupSessionACLs(amConfig.getTezConfiguration(),
-            appId);
+        try{
+          aclConfigs = historyACLPolicyManager.setupSessionACLs(amConfig.getTezConfiguration(),
+              appId);
+        } catch (HistoryACLPolicyException e) {
+          LOG.warn("Disabling history logging for session " + strAppId +
+                   " due to error in setting up history acls " + e);
+          amConfig.getTezConfiguration().setBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED,
+              false);
+        }
       } else {
-        // Non-session mode
-        // As only a single DAG is support, we should combine AM and DAG ACLs under the same
-        // acl management layer
-        aclConfigs = historyACLPolicyManager.setupNonSessionACLs(amConfig.getTezConfiguration(),
-            appId, dag.getDagAccessControls());
+        try{
+          // Non-session mode
+          // As only a single DAG is support, we should combine AM and DAG ACLs under the
same
+          // acl management layer
+          aclConfigs = historyACLPolicyManager.setupNonSessionACLs(amConfig.getTezConfiguration(),
+              appId, dag.getDagAccessControls());
+        } catch (HistoryACLPolicyException e) {
+          LOG.warn("Disabling history logging for dag " +
+              dag.getName() + " due to error in setting up history acls " + e);
+          dag.setConf(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, "false");
+          // This is non-session mode so disable logging for whole AM
+          amConfig.getTezConfiguration().setBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED,
+              false);
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/5b2f011f/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyException.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyException.java
b/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyException.java
new file mode 100644
index 0000000..80229bc
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyException.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.security;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+
+/**
+ * HistoryACLPolicyException Exception thrown upon error within HistoryACLPolicyManager
+ */
+@Public
+public class HistoryACLPolicyException extends Exception {
+  private static final long serialVersionUID = 5337242734701961239L;
+  public HistoryACLPolicyException(Throwable cause) { super(cause); }
+  public HistoryACLPolicyException(String message) { super(message); }
+  public HistoryACLPolicyException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/5b2f011f/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java
b/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java
index a3b62ec..dea89cc 100644
--- a/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java
+++ b/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.security.HistoryACLPolicyException;
 
 /**
  * ACL Policy Manager
@@ -43,7 +44,7 @@ public interface HistoryACLPolicyManager extends Configurable {
    * @throws Exception
    */
   public Map<String, String> setupSessionACLs(Configuration conf, ApplicationId applicationId)
-      throws IOException;
+      throws IOException, HistoryACLPolicyException;
 
   /**
    * Take any necessary steps for setting up ACLs for an AM which is running in non-session
mode
@@ -53,7 +54,7 @@ public interface HistoryACLPolicyManager extends Configurable {
    * @throws Exception
    */
   public Map<String, String> setupNonSessionACLs(Configuration conf, ApplicationId
applicationId,
-      DAGAccessControls dagAccessControls) throws IOException;
+      DAGAccessControls dagAccessControls) throws IOException, HistoryACLPolicyException;
 
   /**
    * Take any necessary steps for setting up ACLs for a DAG that is submitted to a Session
@@ -63,7 +64,7 @@ public interface HistoryACLPolicyManager extends Configurable {
    * @throws Exception
    */
   public Map<String, String> setupSessionDAGACLs(Configuration conf, ApplicationId
applicationId,
-      String dagName, DAGAccessControls dagAccessControls) throws IOException;
+      String dagName, DAGAccessControls dagAccessControls) throws IOException, HistoryACLPolicyException;
 
 
   public void updateTimelineEntityDomain(Object timelineEntity, String domainId);

http://git-wip-us.apache.org/repos/asf/tez/blob/5b2f011f/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 4dffd4d..79e8759 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -331,6 +331,12 @@ public class DAG {
   }
 
   @Private
+  @VisibleForTesting
+  public Map<String,String> getDagConf() {
+    return dagConf;
+  }
+
+  @Private
   public Map<String, LocalResource> getTaskLocalFiles() {
     return commonTaskLocalFiles;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/5b2f011f/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 50b3e00..14e773d 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -207,6 +207,15 @@ public class TezConfiguration extends Configuration {
   public static final boolean TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS_DEFAULT = true;
 
   /**
+   * Boolean value. Determine whether to log history events per dag
+   */
+  @ConfigurationScope(Scope.DAG)
+  @Private
+  public static final String TEZ_DAG_HISTORY_LOGGING_ENABLED =
+      TEZ_PREFIX + "dag.history.logging.enabled";
+  public static final boolean TEZ_DAG_HISTORY_LOGGING_ENABLED_DEFAULT = true;
+
+  /**
    * String value. Command line options which will be prepended to {@link #TEZ_AM_LAUNCH_CMD_OPTS}
    * during the launch of the AppMaster process. This property will typically be configured
to
    * include default options meant to be used by all jobs in a cluster. If required, the
values can
@@ -711,7 +720,16 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_AM_SESSION_MIN_HELD_CONTAINERS = 
       TEZ_AM_PREFIX + "session.min.held-containers";
   public static final int TEZ_AM_SESSION_MIN_HELD_CONTAINERS_DEFAULT = 0;
-  
+
+  /**
+   * Boolean value. Allow/disable logging for all dags in a session   
+   */
+  @Private
+  @ConfigurationScope(Scope.AM)
+  public static final String TEZ_AM_HISTORY_LOGGING_ENABLED =
+      TEZ_AM_PREFIX + "history.logging.enabled";
+  public static final boolean TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT = true;
+
   /**
    * Int value. Specifies the percentage of tasks eligible to be preempted that
    * will actually be preempted in a given round of Tez internal preemption.

http://git-wip-us.apache.org/repos/asf/tez/blob/5b2f011f/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 90935ac..27b9c37 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -1741,6 +1741,10 @@ public class DAGAppMaster extends AbstractService {
               recoveredDAGData.recoveredDAG.getID(), recoveredDAGData.recoveredDAG.getName(),
               recoveredDAGData.recoveredDAG.getUserName(),
               this.clock.getTime(), DAGState.FAILED, recoveredDAGData.reason);
+          dagRecoveredEvent.setHistoryLoggingEnabled(
+              recoveredDAGData.recoveredDAG.getConf().getBoolean(
+                  TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED,
+                  TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED_DEFAULT));
           this.historyEventHandler.handle(new DAGHistoryEvent(recoveredDAGData.recoveredDAG.getID(),
               dagRecoveredEvent));
           dagEventDispatcher.handle(recoverDAGEvent);
@@ -2122,6 +2126,10 @@ public class DAGAppMaster extends AbstractService {
     DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(newDAG.getID(),
         submitTime, dagPlan, this.appAttemptID, cumulativeAdditionalResources,
         newDAG.getUserName(), newDAG.getConf());
+    boolean dagLoggingEnabled = newDAG.getConf().getBoolean(
+        TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED,
+        TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED_DEFAULT);
+    submittedEvent.setHistoryLoggingEnabled(dagLoggingEnabled);
     try {
       historyEventHandler.handleCriticalEvent(
           new DAGHistoryEvent(newDAG.getID(), submittedEvent));

http://git-wip-us.apache.org/repos/asf/tez/blob/5b2f011f/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java
index 5b44de2..7109756 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java
@@ -38,6 +38,8 @@ public class DAGRecoveredEvent implements HistoryEvent {
   private final String dagName;
   private final String user;
 
+  private boolean historyLoggingEnabled = true;
+
   public DAGRecoveredEvent(ApplicationAttemptId applicationAttemptId,
       TezDAGID dagId, String dagName, String user,
       long recoveredTime, DAGState recoveredState,
@@ -111,6 +113,14 @@ public class DAGRecoveredEvent implements HistoryEvent {
     return user;
   }
 
+  public boolean isHistoryLoggingEnabled() {
+    return historyLoggingEnabled;
+  }
+
+  public void setHistoryLoggingEnabled(boolean historyLoggingEnabled) {
+    this.historyLoggingEnabled = historyLoggingEnabled;
+  }
+
   @Override
   public String toString() {
     return "applicationAttemptId="

http://git-wip-us.apache.org/repos/asf/tez/blob/5b2f011f/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
index f5d58a9..978fd0c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
@@ -36,7 +36,6 @@ import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.SummaryEvent;
 import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGSubmittedProto;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
 import org.apache.tez.dag.utils.ProtoUtils;
@@ -55,6 +54,7 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
   private ApplicationAttemptId applicationAttemptId;
   private String user;
   private Map<String, LocalResource> cumulativeAdditionalLocalResources;
+  private boolean historyLoggingEnabled = true;
 
   private Configuration conf;
 
@@ -191,4 +191,11 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent
{
     return conf;
   }
 
+  public void setHistoryLoggingEnabled(boolean loggingEnabled) {
+    historyLoggingEnabled = loggingEnabled;
+  }
+
+  public boolean isHistoryLoggingEnabled() {
+    return historyLoggingEnabled;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/5b2f011f/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java
b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java
index 23d3558..3fa3db6 100644
--- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java
+++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java
@@ -38,6 +38,7 @@ import org.apache.tez.common.security.ACLManager;
 import org.apache.tez.common.security.ACLType;
 import org.apache.tez.common.security.DAGAccessControls;
 import org.apache.tez.common.security.HistoryACLPolicyManager;
+import org.apache.tez.common.security.HistoryACLPolicyException;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -115,7 +116,7 @@ public class ATSHistoryACLPolicyManager implements HistoryACLPolicyManager
{
   }
 
   private void createTimelineDomain(String domainId, Configuration tezConf,
-      DAGAccessControls dagAccessControls) throws IOException {
+      DAGAccessControls dagAccessControls) throws IOException, HistoryACLPolicyException
{
     TimelineDomain timelineDomain = new TimelineDomain();
     timelineDomain.setId(domainId);
 
@@ -129,13 +130,15 @@ public class ATSHistoryACLPolicyManager implements HistoryACLPolicyManager
{
       }
     } catch (Exception e) {
       LOG.warn("Could not post timeline domain", e);
+      throw new
+        HistoryACLPolicyException("Fail to create ACL-related domain in Timeline", e);
     }
   }
 
 
   private Map<String, String> createSessionDomain(Configuration tezConf,
       ApplicationId applicationId, DAGAccessControls dagAccessControls)
-      throws IOException {
+      throws IOException, HistoryACLPolicyException {
     String domainId =
         tezConf.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID);
     if (!tezConf.getBoolean(TezConfiguration.TEZ_AM_ACLS_ENABLED,
@@ -169,7 +172,7 @@ public class ATSHistoryACLPolicyManager implements HistoryACLPolicyManager
{
 
   private Map<String, String> createDAGDomain(Configuration tezConf,
       ApplicationId applicationId, String dagName, DAGAccessControls dagAccessControls)
-      throws IOException {
+      throws IOException, HistoryACLPolicyException {
     if (dagAccessControls == null) {
       // No DAG specific ACLs
       return null;
@@ -221,19 +224,19 @@ public class ATSHistoryACLPolicyManager implements HistoryACLPolicyManager
{
 
   @Override
   public Map<String, String> setupSessionACLs(Configuration conf, ApplicationId applicationId)
-      throws IOException {
+      throws IOException, HistoryACLPolicyException {
     return createSessionDomain(conf, applicationId, null);
   }
 
   @Override
   public Map<String, String> setupNonSessionACLs(Configuration conf, ApplicationId
applicationId,
-      DAGAccessControls dagAccessControls) throws IOException {
+      DAGAccessControls dagAccessControls) throws IOException, HistoryACLPolicyException
{
     return createSessionDomain(conf, applicationId, dagAccessControls);
   }
 
   @Override
   public Map<String, String> setupSessionDAGACLs(Configuration conf, ApplicationId
applicationId,
-      String dagName, DAGAccessControls dagAccessControls) throws IOException {
+      String dagName, DAGAccessControls dagAccessControls) throws IOException, HistoryACLPolicyException
{
     return createDAGDomain(conf, applicationId, dagName, dagAccessControls);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/5b2f011f/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
index c03ce27..fc35971 100644
--- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
+++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.common.ReflectionUtils;
@@ -51,8 +52,13 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.runtime.library.processor.SleepProcessor;
 import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig;
 import org.apache.tez.tests.MiniTezClusterWithTimeline;
@@ -66,6 +72,11 @@ import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.WebResource;
 
+import org.mockito.Matchers;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.doThrow;
+
 public class TestATSHistoryWithACLs {
 
   private static final Logger LOG = LoggerFactory.getLogger(TestATSHistoryWithACLs.class);
@@ -297,6 +308,332 @@ public class TestATSHistoryWithACLs {
     verifyEntityDomains(applicationId, false);
   }
 
+  /**
+   * test failure of domain creation during dag submittion in session mode
+   * only affect logging for that dag not following submitted dag 
+   * @throws Exception
+   */
+  @Test (timeout=50000)
+  public void testMultipleDagSession() throws Exception {
+    TezClient tezSession = null;
+    String viewAcls = "nobody nobody_group";
+    SleepProcessorConfig spConf = new SleepProcessorConfig(1);
+
+    DAG dag = DAG.create("TezSleepProcessor");
+    Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
+            SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+            Resource.newInstance(256, 1));
+    dag.addVertex(vertex);
+    DAGAccessControls accessControls = new DAGAccessControls();
+    accessControls.setUsersWithViewACLs(Collections.singleton("nobody2"));
+    accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group2"));
+    dag.setAccessControls(accessControls);
+
+    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+    tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls);
+    tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+        ATSHistoryLoggingService.class.getName());
+    Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
+        .nextInt(100000))));
+    remoteFs.mkdirs(remoteStagingDir);
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
+
+    tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
+    tezSession.start();
+
+    //////submit first dag which fails in dag creation//////
+    ATSHistoryACLPolicyManager myAclPolicyManager = ReflectionUtils.createClazzInstance(
+              atsHistoryACLManagerClassName);
+    myAclPolicyManager.timelineClient = mock(TimelineClient.class);
+
+    doThrow(new IOException("Fail to Put Domain")).when(myAclPolicyManager.timelineClient).putDomain(Matchers.<TimelineDomain>anyVararg());
+    tezSession.setUpHistoryAclManager(myAclPolicyManager);
+
+    DAGClient dagClient = tezSession.submitDAG(dag);
+    DAGStatus dagStatus = dagClient.getDAGStatus(null);
+    while (!dagStatus.isCompleted()) {
+      LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+          + dagStatus.getState());
+      Thread.sleep(500l);
+      dagStatus = dagClient.getDAGStatus(null);
+    }
+    assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+    String dagLogging = dag.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED);
+    assertEquals(dagLogging, "false");
+    
+    myAclPolicyManager.timelineClient = null;
+    myAclPolicyManager.setConf(tezConf);
+    tezSession.setUpHistoryAclManager(myAclPolicyManager);
+
+    //////submit second dag which succeeds in dag creation//////
+    DAG dag2 = DAG.create("TezSleepProcessor2");
+    vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
+          SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+       Resource.newInstance(256, 1));
+    dag2.addVertex(vertex);
+    accessControls = new DAGAccessControls();
+    accessControls.setUsersWithViewACLs(Collections.singleton("nobody3"));
+    accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group3"));
+    dag2.setAccessControls(accessControls);
+    dagClient = tezSession.submitDAG(dag2);
+    dagStatus = dagClient.getDAGStatus(null);
+    while (!dagStatus.isCompleted()) {
+      LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+                + dagStatus.getState());
+      Thread.sleep(500l);
+      dagStatus = dagClient.getDAGStatus(null);
+    }
+    dagLogging = dag2.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED);
+    Assert.assertNull(dagLogging);
+    tezSession.stop();
+  }
+  
+/**
+ * test failure of domain creation during dag submittion in nonsession mode
+ * only affect logging for that dag not following submitted dag 
+ * @throws Exception
+ */
+  @Test (timeout=50000)
+  public void testMultipleDagNonSession() throws Exception {
+    TezClient tezClient = null;
+    String viewAcls = "nobody nobody_group";
+    SleepProcessorConfig spConf = new SleepProcessorConfig(1);
+
+    DAG dag = DAG.create("TezSleepProcessor");
+    Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
+            SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+            Resource.newInstance(256, 1));
+    dag.addVertex(vertex);
+    DAGAccessControls accessControls = new DAGAccessControls();
+    accessControls.setUsersWithViewACLs(Collections.singleton("nobody2"));
+    accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group2"));
+    dag.setAccessControls(accessControls);
+
+    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+    tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls);
+    tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+        ATSHistoryLoggingService.class.getName());
+    Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
+        .nextInt(100000))));
+    remoteFs.mkdirs(remoteStagingDir);
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
+
+    tezClient = TezClient.create("TezSleepProcessor", tezConf, false);
+    tezClient.start();
+
+    //////submit first dag which fails in dag creation//////
+    ATSHistoryACLPolicyManager myAclPolicyManager = ReflectionUtils.createClazzInstance(
+              atsHistoryACLManagerClassName);
+    myAclPolicyManager.timelineClient = mock(TimelineClient.class);
+
+    doThrow(new IOException("Fail to Put Domain")).when(myAclPolicyManager.timelineClient).putDomain(Matchers.<TimelineDomain>anyVararg());
+    tezClient.setUpHistoryAclManager(myAclPolicyManager);
+
+    DAGClient dagClient = tezClient.submitDAG(dag);
+    DAGStatus dagStatus = dagClient.getDAGStatus(null);
+    while (!dagStatus.isCompleted()) {
+      LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+          + dagStatus.getState());
+      Thread.sleep(500l);
+      dagStatus = dagClient.getDAGStatus(null);
+    }
+    assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+    String dagLogging = dag.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED);
+    assertEquals(dagLogging, "false");
+    
+    myAclPolicyManager.timelineClient = null;
+    myAclPolicyManager.setConf(tezConf);
+    tezClient.setUpHistoryAclManager(myAclPolicyManager);
+
+    //////submit second dag which succeeds in dag creation//////
+    DAG dag2 = DAG.create("TezSleepProcessor2");
+    vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
+           SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+        Resource.newInstance(256, 1));
+    dag2.addVertex(vertex);
+    accessControls = new DAGAccessControls();
+    accessControls.setUsersWithViewACLs(Collections.singleton("nobody3"));
+    accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group3"));
+    dag2.setAccessControls(accessControls);
+    dagClient = tezClient.submitDAG(dag2);
+    dagStatus = dagClient.getDAGStatus(null);
+    while (!dagStatus.isCompleted()) {
+      LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+                   + dagStatus.getState());
+      Thread.sleep(500l);
+      dagStatus = dagClient.getDAGStatus(null);
+    }
+    dagLogging = dag2.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED);
+    Assert.assertNull(dagLogging);
+    tezClient.stop();
+  }
+  /**
+   * Test Disable Logging for all dags in a session 
+   * due to failure to create domain in session start
+   * @throws Exception
+   */
+  @Test (timeout=50000)
+  public void testDisableSessionLogging() throws Exception {
+    TezClient tezSession = null;
+    String viewAcls = "nobody nobody_group";
+    SleepProcessorConfig spConf = new SleepProcessorConfig(1);
+
+    DAG dag = DAG.create("TezSleepProcessor");
+    Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
+            SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+            Resource.newInstance(256, 1));
+    dag.addVertex(vertex);
+    DAGAccessControls accessControls = new DAGAccessControls();
+    accessControls.setUsersWithViewACLs(Collections.singleton("nobody2"));
+    accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group2"));
+    dag.setAccessControls(accessControls);
+
+    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+    tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls);
+    tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+        ATSHistoryLoggingService.class.getName());
+    Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
+        .nextInt(100000))));
+    remoteFs.mkdirs(remoteStagingDir);
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
+
+    tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
+    ATSHistoryACLPolicyManager myAclPolicyManager = ReflectionUtils.createClazzInstance(
+            atsHistoryACLManagerClassName);
+    myAclPolicyManager.timelineClient = mock(TimelineClient.class);
+
+    doThrow(new IOException("Fail to Put Domain")).
+        when(myAclPolicyManager.timelineClient).putDomain(Matchers.<TimelineDomain>anyVararg());
+    tezSession.setUpHistoryAclManager(myAclPolicyManager);
+    tezSession.start();
+
+    ///substitute back mocked timelineClient with a normal one
+    myAclPolicyManager.timelineClient = null;
+    myAclPolicyManager.setConf(tezConf);
+    tezSession.setUpHistoryAclManager(myAclPolicyManager);
+    //////submit first dag //////
+    DAGClient dagClient = tezSession.submitDAG(dag);
+    DAGStatus dagStatus = dagClient.getDAGStatus(null);
+    while (!dagStatus.isCompleted()) {
+      LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+          + dagStatus.getState());
+      Thread.sleep(500l);
+      dagStatus = dagClient.getDAGStatus(null);
+    }
+    assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+    String dagLogging = dag.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED);
+    assertEquals(dagLogging, "false");
+ 
+    //////submit second dag//////
+    DAG dag2 = DAG.create("TezSleepProcessor2");
+    vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
+          SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+       Resource.newInstance(256, 1));
+    dag2.addVertex(vertex);
+    accessControls = new DAGAccessControls();
+    accessControls.setUsersWithViewACLs(Collections.singleton("nobody3"));
+    accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group3"));
+    dag2.setAccessControls(accessControls);
+    dagClient = tezSession.submitDAG(dag2);
+    dagStatus = dagClient.getDAGStatus(null);
+    while (!dagStatus.isCompleted()) {
+      LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+                + dagStatus.getState());
+      Thread.sleep(500l);
+      dagStatus = dagClient.getDAGStatus(null);
+    }
+    dagLogging = dag2.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED);
+    assertEquals(dagLogging, "false");
+    tezSession.stop();
+  }
+  /**
+   * use mini cluster to verify data do not push to ats when the daglogging flag
+   * in dagsubmittedevent is set off
+   * @throws Exception
+   */
+  @Test (timeout=50000)
+  public void testDagLoggingDisabled() throws Exception {
+    ATSHistoryLoggingService historyLoggingService;
+    historyLoggingService =
+        ReflectionUtils.createClazzInstance(ATSHistoryLoggingService.class.getName());
+    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+    String viewAcls = "nobody nobody_group";
+    tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls);
+    tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+        ATSHistoryLoggingService.class.getName());
+    Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
+        .nextInt(100000))));
+    remoteFs.mkdirs(remoteStagingDir);
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
+    historyLoggingService.serviceInit(tezConf);
+    historyLoggingService.serviceStart();
+    ApplicationId appId = ApplicationId.newInstance(100l, 1);
+    TezDAGID tezDAGID = TezDAGID.getInstance(
+                        appId, 100);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+    DAGPlan dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build();
+    DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(tezDAGID,
+          1, dagPlan, appAttemptId, null,
+          "usr", tezConf);
+    submittedEvent.setHistoryLoggingEnabled(false);
+    DAGHistoryEvent event = new DAGHistoryEvent(tezDAGID, submittedEvent);
+    historyLoggingService.handle(new DAGHistoryEvent(tezDAGID, submittedEvent));
+    Thread.sleep(1000l);
+    String url = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_DAG_ID/"+event.getDagID();
+    Client client = new Client();
+    WebResource resource = client.resource(url);
+
+    ClientResponse response = resource.accept(MediaType.APPLICATION_JSON)
+        .get(ClientResponse.class);
+    assertEquals(404, response.getStatus());
+  }
+  
+  /**
+   * use mini cluster to verify data do push to ats when
+   * the dag logging flag in dagsubmitted event is set on
+   * @throws Exception
+   */
+  @Test (timeout=50000)
+  public void testDagLoggingEnabled() throws Exception {
+    ATSHistoryLoggingService historyLoggingService;
+    historyLoggingService =
+            ReflectionUtils.createClazzInstance(ATSHistoryLoggingService.class.getName());
+    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+    String viewAcls = "nobody nobody_group";
+    tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls);
+    tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+        ATSHistoryLoggingService.class.getName());
+    Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
+        .nextInt(100000))));
+    remoteFs.mkdirs(remoteStagingDir);
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
+    historyLoggingService.serviceInit(tezConf);
+    historyLoggingService.serviceStart();
+    ApplicationId appId = ApplicationId.newInstance(100l, 1);
+    TezDAGID tezDAGID = TezDAGID.getInstance(
+                        appId, 11);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+    DAGPlan dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build();
+    DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(tezDAGID,
+            1, dagPlan, appAttemptId, null,
+            "usr", tezConf);
+    submittedEvent.setHistoryLoggingEnabled(true);
+    DAGHistoryEvent event = new DAGHistoryEvent(tezDAGID, submittedEvent);
+    historyLoggingService.handle(new DAGHistoryEvent(tezDAGID, submittedEvent));
+    Thread.sleep(1000l);
+    String url = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_DAG_ID/"+event.getDagID();
+    Client client = new Client();
+    WebResource resource = client.resource(url);
+
+    ClientResponse response = resource.accept(MediaType.APPLICATION_JSON)
+        .get(ClientResponse.class);
+    assertEquals(200, response.getStatus());
+    assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+    TimelineEntity entity = response.getEntity(TimelineEntity.class);
+    assertEquals(entity.getEntityType(), "TEZ_DAG_ID");
+    assertEquals(entity.getEvents().get(0).getEventType(), HistoryEventType.DAG_SUBMITTED.toString());
+  }
+  
   private static final String atsHistoryACLManagerClassName =
       "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager";
   @Test (timeout=50000)

http://git-wip-us.apache.org/repos/asf/tez/blob/5b2f011f/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
index 0abde11..9a2d77e 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
@@ -27,6 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.tez.dag.history.events.DAGRecoveredEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -60,6 +61,7 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
   private int eventCounter = 0;
   private int eventsProcessed = 0;
   private final Object lock = new Object();
+  private boolean historyLoggingEnabled = true;
 
   @VisibleForTesting
   TimelineClient timelineClient;
@@ -85,7 +87,15 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
 
   @Override
   public void serviceInit(Configuration conf) throws Exception {
+    historyLoggingEnabled = conf.getBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED,
+        TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT);
+    if (!historyLoggingEnabled) {
+      LOG.info("ATSService: History Logging disabled. "
+          + TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED + " set to false");
+      return;
+    }
     LOG.info("Initializing ATSService");
+
     if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
       YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
       timelineClient = TimelineClient.createTimelineClient();
@@ -132,7 +142,7 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
 
   @Override
   public void serviceStart() {
-    if (timelineClient == null) {
+    if (!historyLoggingEnabled || timelineClient == null) {
       return;
     }
     LOG.info("Starting ATSService");
@@ -186,7 +196,7 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
 
   @Override
   public void serviceStop() {
-    if (timelineClient == null) {
+    if (!historyLoggingEnabled || timelineClient == null) {
       return;
     }
     LOG.info("Stopping ATSService"
@@ -253,7 +263,7 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
 
 
   public void handle(DAGHistoryEvent event) {
-    if (timelineClient != null) {
+    if (historyLoggingEnabled && timelineClient != null) {
       eventQueue.add(event);
     }
   }
@@ -266,9 +276,9 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
       DAGSubmittedEvent dagSubmittedEvent =
           (DAGSubmittedEvent) event.getHistoryEvent();
       String dagName = dagSubmittedEvent.getDAGName();
-      if (dagName != null
-          && dagName.startsWith(
-          TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
+      if ((dagName != null
+          && dagName.startsWith(TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX))
+          || (!dagSubmittedEvent.isHistoryLoggingEnabled())) {
         // Skip recording pre-warm DAG events
         skippedDAGs.add(dagId);
         return false;
@@ -281,6 +291,13 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
         }
       }
     }
+    if (eventType.equals(HistoryEventType.DAG_RECOVERED)) {
+      DAGRecoveredEvent dagRecoveredEvent = (DAGRecoveredEvent) event.getHistoryEvent();
+      if (!dagRecoveredEvent.isHistoryLoggingEnabled()) {
+        skippedDAGs.add(dagRecoveredEvent.getDagID());
+        return false;
+      }
+    }
     if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
       // Remove from set to keep size small
       // No more events should be seen after this point.


Mime
View raw message