tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject tez git commit: TEZ-3359. Add granular log levels for HistoryLoggingService. (Harish Jaiprakash via hitesh)
Date Tue, 26 Jul 2016 17:44:29 GMT
Repository: tez
Updated Branches:
  refs/heads/master ac37c4a49 -> cbc0c6376


TEZ-3359. Add granular log levels for HistoryLoggingService. (Harish Jaiprakash via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cbc0c637
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cbc0c637
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cbc0c637

Branch: refs/heads/master
Commit: cbc0c63761ecb81f4805cd8318cd347f0bc7674e
Parents: ac37c4a
Author: Hitesh Shah <hitesh@apache.org>
Authored: Tue Jul 26 10:44:08 2016 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Tue Jul 26 10:44:08 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../java/org/apache/tez/client/TezClient.java   |  14 +-
 .../main/java/org/apache/tez/dag/api/DAG.java   |  42 +++-
 .../org/apache/tez/dag/api/HistoryLogLevel.java |  63 ++++++
 .../apache/tez/dag/api/TezConfiguration.java    |  14 +-
 .../java/org/apache/tez/dag/api/TestDAG.java    |  76 +++++++
 .../apache/tez/dag/api/TestHistoryLogLevel.java |  63 ++++++
 .../tez/dag/history/HistoryEventHandler.java    |  45 +++-
 .../tez/dag/history/HistoryEventType.java       |  59 +++---
 .../dag/history/TestHistoryEventHandler.java    | 204 +++++++++++++++++++
 .../dag/history/ats/acls/TestATSHistoryV15.java |  98 +++++----
 11 files changed, 611 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/cbc0c637/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fb39981..5f25985 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3359. Add granular log levels for HistoryLoggingService.
   TEZ-3374. Change TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP conf key name.
   TEZ-3358. Support using the same TimelineGroupId for multiple DAGs.
   TEZ-3357. Change TimelineCachePlugin to handle DAG grouping.
@@ -89,6 +90,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3359. Add granular log levels for HistoryLoggingService.
   TEZ-3374. Change TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP conf key name.
   TEZ-3358. Support using the same TimelineGroupId for multiple DAGs.
   TEZ-3357. Change TimelineCachePlugin to handle DAG grouping.

http://git-wip-us.apache.org/repos/asf/tez/blob/cbc0c637/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index f359a26..df39c0a 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -62,6 +62,7 @@ import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DAGSubmissionTimedOut;
 import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.HistoryLogLevel;
 import org.apache.tez.dag.api.PreWarmVertex;
 import org.apache.tez.dag.api.SessionNotReady;
 import org.apache.tez.dag.api.SessionNotRunning;
@@ -362,7 +363,18 @@ public class TezClient {
             "Credentials cannot be set after the session App Master has been started");
     amConfig.setCredentials(credentials);
   }
-  
+
+  /**
+   * Sets the history log level for this session. It will be in effect for DAGs submitted
after this
+   * call.
+   *
+   * @param historyLogLevel The log level to be used.
+   */
+  public synchronized void setHistoryLogLevel(HistoryLogLevel historyLogLevel) {
+    amConfig.getTezConfiguration().setEnum(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL,
+        historyLogLevel);
+  }
+
   @Private
   @VisibleForTesting
   public synchronized void setUpHistoryAclManager(HistoryACLPolicyManager myAclPolicyManager)
{

http://git-wip-us.apache.org/repos/asf/tez/blob/cbc0c637/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 0eb51e1..65321a8 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -375,6 +375,18 @@ public class DAG {
   }
 
   /**
+   * Set history log level for this DAG. This config overrides the default or one set at
the session
+   * level.
+   *
+   * @param historyLogLevel The ATS history log level for this DAG.
+   *
+   * @return this DAG
+   */
+  public DAG setHistoryLogLevel(HistoryLogLevel historyLogLevel) {
+    return this.setConf(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL, historyLogLevel.name());
+  }
+
+  /**
    * Sets the default execution context for the DAG. This can be overridden at a per Vertex
level.
    * See {@link org.apache.tez.dag.api.Vertex#setExecutionContext(VertexExecutionContext)}
    *
@@ -1005,7 +1017,6 @@ public class DAG {
       dagBuilder.addEdge(edgeBuilder);
     }
 
-
     ConfigurationProto.Builder confProtoBuilder =
         ConfigurationProto.newBuilder();
     if (dagAccessControls != null) {
@@ -1030,7 +1041,7 @@ public class DAG {
         confProtoBuilder.addConfKeyValues(kvp);
       }
     }
-    if (this.dagConf != null && !this.dagConf.isEmpty()) {
+    if (!this.dagConf.isEmpty()) {
       for (Entry<String, String> entry : this.dagConf.entrySet()) {
         PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
         kvp.setKey(entry.getKey());
@@ -1038,13 +1049,38 @@ public class DAG {
         confProtoBuilder.addConfKeyValues(kvp);
       }
     }
+    // Copy historyLogLevel from tezConf into dagConf if its not overridden in dagConf.
+    String logLevel = this.dagConf.get(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL);
+    if (logLevel != null) {
+      // The config is from dagConf, we have already added it to the proto above, just check
if
+      // the value is valid.
+      if (!HistoryLogLevel.validateLogLevel(logLevel)) {
+        throw new IllegalArgumentException(
+            "Config: " + TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL +
+            " is set to invalid value: " + logLevel);
+      }
+    } else {
+      // Validate and set value from tezConf.
+      logLevel = tezConf.get(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL);
+      if (logLevel != null) {
+        if (!HistoryLogLevel.validateLogLevel(logLevel)) {
+          throw new IllegalArgumentException(
+              "Config: " + TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL +
+              " is set to invalid value: " + logLevel);
+        }
+        PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
+        kvp.setKey(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL);
+        kvp.setValue(logLevel);
+        confProtoBuilder.addConfKeyValues(kvp);
+      }
+    }
     dagBuilder.setDagConf(confProtoBuilder);
 
     if (dagCredentials != null) {
       dagBuilder.setCredentialsBinary(DagTypeConverters.convertCredentialsToProto(dagCredentials));
       TezCommonUtils.logCredentials(LOG, dagCredentials, "dag");
     }
-    
+
     return dagBuilder.build();
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/cbc0c637/tez-api/src/main/java/org/apache/tez/dag/api/HistoryLogLevel.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/HistoryLogLevel.java b/tez-api/src/main/java/org/apache/tez/dag/api/HistoryLogLevel.java
new file mode 100644
index 0000000..5eb4785
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/HistoryLogLevel.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import java.util.Locale;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * The log level for the history, this is used to determine which events are sent to the
history
+ * logger. The default level is ALL.
+ */
+@Public
+public enum HistoryLogLevel {
+  NONE,
+  AM,
+  DAG,
+  VERTEX,
+  TASK,
+  ALL;
+
+  public static final HistoryLogLevel DEFAULT = ALL;
+
+  public boolean shouldLog(HistoryLogLevel eventLevel) {
+    return eventLevel.ordinal() <= ordinal();
+  }
+
+  public static HistoryLogLevel getLogLevel(Configuration conf, HistoryLogLevel defaultValue)
{
+    String logLevel = conf.getTrimmed(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL);
+    if (logLevel == null) {
+      return defaultValue;
+    }
+    return valueOf(logLevel.toUpperCase(Locale.ENGLISH));
+  }
+
+  public static boolean validateLogLevel(String logLevel) {
+    if (logLevel != null) {
+      try {
+        valueOf(logLevel.toUpperCase(Locale.ENGLISH));
+      } catch (IllegalArgumentException e) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/cbc0c637/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 005304a..15e937f 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1229,9 +1229,19 @@ public class TezConfiguration extends Configuration {
       "org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService";
 
   /**
+   * Enum value. Config to limit the type of events published to the history logging service.
+   * The valid log levels are defined in the enum {@link HistoryLogLevel}. The default value
is
+   * defined in {@link HistoryLogLevel#DEFAULT}.
+   */
+  @ConfigurationScope(Scope.DAG)
+  @ConfigurationProperty
+  public static final String TEZ_HISTORY_LOGGING_LOGLEVEL =
+      TEZ_PREFIX + "history.logging.log.level";
+
+  /**
    * Comma separated list of Integers. These are the values that were set for the config
value
-   * for {@value #TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP}. The older values are
required so that
-   * the groupIds generated previously will continue to be generated by the plugin. If an
older
+   * for {@value #TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP}. The older values are
required so
+   * that the groupIds generated previously will continue to be generated by the plugin.
If an older
    * value is not present then the UI may not show information for DAGs which were created
    * with a different grouping value.
    *

http://git-wip-us.apache.org/repos/asf/tez/blob/cbc0c637/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
index ae5dfbb..05c4e30 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
@@ -30,7 +30,9 @@ import org.apache.tez.client.CallerContext;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan.Builder;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -361,4 +363,78 @@ public class TestDAG {
         Assert.assertEquals(dagPlan, firstPlan);
     }
   }
+
+  @Test
+  public void testCreateDAGForHistoryLogLevel() {
+    Map<String, LocalResource> lrDAG = Collections.singletonMap("LR1",
+        LocalResource.newInstance(
+            URL.newInstance("file", "localhost", 0, "/test1"),
+            LocalResourceType.FILE,
+            LocalResourceVisibility.PUBLIC, 1, 1));
+    Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("dummyProcessor1"), 1,
+        Resource.newInstance(1, 1));
+    Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("dummyProcessor2"), 1,
+        Resource.newInstance(1, 1));
+    DAG dag = DAG.create("dag1").addVertex(v1).addVertex(v2).addTaskLocalFiles(lrDAG);
+
+    TezConfiguration tezConf = new TezConfiguration();
+
+    // Expect null when history log level is not set in both dag and tezConf
+    DAGPlan dagPlan = dag.createDag(tezConf, null, null, null, false);
+    Builder builder = DAGPlan.newBuilder(dagPlan);
+    Assert.assertNull(findKVP(builder.getDagConf(), TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL));
+
+    // Set tezConf but not dag, expect value in tezConf.
+    tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL, "TASK");
+    dagPlan = dag.createDag(tezConf, null, null, null, false);
+    Assert.assertEquals("TASK", findKVP(DAGPlan.newBuilder(dagPlan).getDagConf(),
+        TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL));
+
+    // Set invalid value in tezConf, expect exception.
+    tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL, "invalid");
+    try {
+      dagPlan = dag.createDag(tezConf, null, null, null, false);
+      Assert.fail("Expected illegal argument exception");
+    } catch (IllegalArgumentException e) {
+      Assert.assertEquals("Config: " + TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL +
+            " is set to invalid value: invalid", e.getMessage());
+    }
+
+    // Set value in dag, should override tez conf value.
+    dag.setHistoryLogLevel(HistoryLogLevel.VERTEX);
+    dagPlan = dag.createDag(tezConf, null, null, null, false);
+    Assert.assertEquals("VERTEX", findKVP(DAGPlan.newBuilder(dagPlan).getDagConf(),
+        TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL));
+
+    // Set value directly into dagConf.
+    dag.setConf(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL, HistoryLogLevel.DAG.name());
+    dagPlan = dag.createDag(tezConf, null, null, null, false);
+    Assert.assertEquals("DAG", findKVP(DAGPlan.newBuilder(dagPlan).getDagConf(),
+        TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL));
+
+    // Set value invalid directly into dagConf and expect exception.
+    dag.setConf(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL, "invalid");
+    try {
+      dagPlan = dag.createDag(tezConf, null, null, null, false);
+      Assert.fail("Expected illegal argument exception");
+    } catch (IllegalArgumentException e) {
+      Assert.assertEquals("Config: " + TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL +
+            " is set to invalid value: invalid", e.getMessage());
+    }
+  }
+
+  private String findKVP(ConfigurationProto conf, String key) {
+    String foundValue = null;
+    for (int i = 0; i < conf.getConfKeyValuesCount(); ++i) {
+      if (conf.getConfKeyValues(i).getKey().equals(key)) {
+        if (foundValue == null) {
+          foundValue = conf.getConfKeyValues(i).getValue();
+        } else {
+          Assert.fail("Multiple values found: " + foundValue + ", " +
+              conf.getConfKeyValues(i).getValue());
+        }
+      }
+    }
+    return foundValue;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/cbc0c637/tez-api/src/test/java/org/apache/tez/dag/api/TestHistoryLogLevel.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestHistoryLogLevel.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestHistoryLogLevel.java
new file mode 100644
index 0000000..76c944d
--- /dev/null
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestHistoryLogLevel.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+public class TestHistoryLogLevel {
+
+  @Test
+  public void testGetLogLevel() {
+    assertNull(HistoryLogLevel.getLogLevel(getConfiguration(null), null));
+    assertEquals(HistoryLogLevel.DEFAULT,
+        HistoryLogLevel.getLogLevel(getConfiguration(null), HistoryLogLevel.DEFAULT));
+    assertEquals(HistoryLogLevel.NONE,
+        HistoryLogLevel.getLogLevel(getConfiguration("NONE"), HistoryLogLevel.DEFAULT));
+    assertEquals(HistoryLogLevel.NONE,
+        HistoryLogLevel.getLogLevel(getConfiguration("none"), HistoryLogLevel.DEFAULT));
+    try {
+      HistoryLogLevel.getLogLevel(getConfiguration("invalid"), HistoryLogLevel.DEFAULT);
+      fail("Expected IllegalArugment Exception");
+    } catch (IllegalArgumentException e) {
+    }
+  }
+
+  @Test
+  public void testValidateLogLevel() {
+    assertTrue(HistoryLogLevel.validateLogLevel(null));
+    assertTrue(HistoryLogLevel.validateLogLevel("NONE"));
+    assertTrue(HistoryLogLevel.validateLogLevel("none"));
+    assertFalse(HistoryLogLevel.validateLogLevel("invalid"));
+  }
+
+  private Configuration getConfiguration(String confValue) {
+    Configuration conf = new Configuration(false);
+    if (confValue != null) {
+      conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL, confValue);
+    }
+    return conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/cbc0c637/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
index 95ff0cd..042d022 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
@@ -19,14 +19,18 @@
 package org.apache.tez.dag.history;
 
 import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.HistoryLogLevel;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
 import org.apache.tez.dag.history.logging.HistoryLoggingService;
 import org.apache.tez.dag.history.recovery.RecoveryService;
 import org.apache.tez.dag.records.TezDAGID;
@@ -40,6 +44,10 @@ public class HistoryEventHandler extends CompositeService {
   private boolean recoveryEnabled;
   private HistoryLoggingService historyLoggingService;
 
+  private HistoryLogLevel amHistoryLogLevel;
+  private Map<TezDAGID, HistoryLogLevel> dagIdToLogLevel =
+      new ConcurrentHashMap<TezDAGID, HistoryLogLevel>();
+
   public HistoryEventHandler(AppContext context) {
     super(HistoryEventHandler.class.getName());
     this.context = context;
@@ -70,8 +78,10 @@ public class HistoryEventHandler extends CompositeService {
           new Class[]{AppContext.class}, new Object[] {context});
       addService(recoveryService);
     }
-    super.serviceInit(conf);
 
+    amHistoryLogLevel = HistoryLogLevel.getLogLevel(context.getAMConf(), HistoryLogLevel.DEFAULT);
+
+    super.serviceInit(conf);
   }
 
   @Override
@@ -106,7 +116,7 @@ public class HistoryEventHandler extends CompositeService {
     if (recoveryEnabled && event.getHistoryEvent().isRecoveryEvent()) {
       recoveryService.handle(event);
     }
-    if (event.getHistoryEvent().isHistoryEvent()) {
+    if (event.getHistoryEvent().isHistoryEvent() && shouldLogEvent(event)) {
       historyLoggingService.handle(event);
     }
 
@@ -118,6 +128,37 @@ public class HistoryEventHandler extends CompositeService {
         + ": " + event.getHistoryEvent().toString());
   }
 
+  private boolean shouldLogEvent(DAGHistoryEvent event) {
+    TezDAGID dagId = event.getDagID();
+
+    HistoryLogLevel dagLogLevel = null;
+    if (dagId != null) {
+      dagLogLevel = dagIdToLogLevel.get(dagId);
+    }
+    if (dagLogLevel == null) {
+      dagLogLevel = amHistoryLogLevel;
+    }
+
+    HistoryEvent historyEvent = event.getHistoryEvent();
+    if (historyEvent.getEventType() == HistoryEventType.DAG_SUBMITTED) {
+      dagLogLevel = HistoryLogLevel.getLogLevel(((DAGSubmittedEvent)historyEvent).getConf(),
+          amHistoryLogLevel);
+      dagIdToLogLevel.put(dagId, dagLogLevel);
+    } else if (historyEvent.getEventType() == HistoryEventType.DAG_RECOVERED) {
+      if (context.getCurrentDAG() != null) {
+        dagLogLevel = HistoryLogLevel.getLogLevel(context.getCurrentDAG().getConf(),
+            amHistoryLogLevel);
+        dagIdToLogLevel.put(dagId, dagLogLevel);
+      }
+    } else if (historyEvent.getEventType() == HistoryEventType.DAG_FINISHED) {
+      if (dagIdToLogLevel.containsKey(dagId)) {
+        dagIdToLogLevel.remove(dagId);
+      }
+    }
+
+    return dagLogLevel.shouldLog(historyEvent.getEventType().getHistoryLogLevel());
+  }
+
   public void handle(DAGHistoryEvent event) {
     try {
       handleCriticalEvent(event);

http://git-wip-us.apache.org/repos/asf/tez/blob/cbc0c637/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
index 9bf98df..a41d0e6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
@@ -18,32 +18,39 @@
 
 package org.apache.tez.dag.history;
 
+import org.apache.tez.dag.api.HistoryLogLevel;
 import org.apache.tez.dag.api.TezUncheckedException;
 
 public enum HistoryEventType {
-  APP_LAUNCHED,
-  AM_LAUNCHED,
-  AM_STARTED,
-  DAG_SUBMITTED,
-  DAG_INITIALIZED,
-  DAG_STARTED,
-  DAG_FINISHED,
-  DAG_KILL_REQUEST,
-  VERTEX_INITIALIZED,
-  VERTEX_STARTED,
-  VERTEX_CONFIGURE_DONE,
-  VERTEX_FINISHED,
-  TASK_STARTED,
-  TASK_FINISHED,
-  TASK_ATTEMPT_STARTED,
-  TASK_ATTEMPT_FINISHED,
-  CONTAINER_LAUNCHED,
-  CONTAINER_STOPPED,
-  DAG_COMMIT_STARTED,
-  VERTEX_COMMIT_STARTED,
-  VERTEX_GROUP_COMMIT_STARTED,
-  VERTEX_GROUP_COMMIT_FINISHED,
-  DAG_RECOVERED;
+  APP_LAUNCHED(HistoryLogLevel.AM),
+  AM_LAUNCHED(HistoryLogLevel.AM),
+  AM_STARTED(HistoryLogLevel.AM),
+  DAG_SUBMITTED(HistoryLogLevel.DAG),
+  DAG_INITIALIZED(HistoryLogLevel.DAG),
+  DAG_STARTED(HistoryLogLevel.DAG),
+  DAG_FINISHED(HistoryLogLevel.DAG),
+  DAG_KILL_REQUEST(HistoryLogLevel.DAG),
+  VERTEX_INITIALIZED(HistoryLogLevel.VERTEX),
+  VERTEX_STARTED(HistoryLogLevel.VERTEX),
+  VERTEX_CONFIGURE_DONE(HistoryLogLevel.VERTEX),
+  VERTEX_FINISHED(HistoryLogLevel.VERTEX),
+  TASK_STARTED(HistoryLogLevel.TASK),
+  TASK_FINISHED(HistoryLogLevel.TASK),
+  TASK_ATTEMPT_STARTED(HistoryLogLevel.ALL),
+  TASK_ATTEMPT_FINISHED(HistoryLogLevel.ALL),
+  CONTAINER_LAUNCHED(HistoryLogLevel.ALL),
+  CONTAINER_STOPPED(HistoryLogLevel.ALL),
+  DAG_COMMIT_STARTED(HistoryLogLevel.DAG),
+  VERTEX_COMMIT_STARTED(HistoryLogLevel.VERTEX),
+  VERTEX_GROUP_COMMIT_STARTED(HistoryLogLevel.VERTEX),
+  VERTEX_GROUP_COMMIT_FINISHED(HistoryLogLevel.VERTEX),
+  DAG_RECOVERED(HistoryLogLevel.DAG);
+
+  private final HistoryLogLevel historyLogLevel;
+
+  private HistoryEventType(HistoryLogLevel historyLogLevel) {
+    this.historyLogLevel = historyLogLevel;
+  }
 
   public static boolean isDAGSpecificEvent(HistoryEventType historyEventType) {
     switch (historyEventType) {
@@ -77,6 +84,8 @@ public enum HistoryEventType {
     }
   }
 
-
-
+  public HistoryLogLevel getHistoryLogLevel() {
+    return historyLogLevel;
   }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/cbc0c637/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java
b/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java
new file mode 100644
index 0000000..c8a076d
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.dag.api.HistoryLogLevel;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGRecoveredEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.history.logging.HistoryLoggingService;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.hadoop.shim.HadoopShim;
+import org.junit.Test;
+
+public class TestHistoryEventHandler {
+
+  private static ApplicationId appId = ApplicationId.newInstance(1000l, 1);
+  private static ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId,
1);
+  private static String user = "TEST_USER";
+
+  @Test
+  public void testAll() {
+    testLogLevel(null, 6);
+    testLogLevel(HistoryLogLevel.NONE, 0);
+    testLogLevel(HistoryLogLevel.AM, 1);
+    testLogLevel(HistoryLogLevel.DAG, 3);
+    testLogLevel(HistoryLogLevel.VERTEX, 4);
+    testLogLevel(HistoryLogLevel.TASK, 5);
+    testLogLevel(HistoryLogLevel.ALL, 6);
+  }
+
+  @Test
+  public void testWithDAGRecovery() {
+    testLogLevelWithRecovery(null, 6);
+    testLogLevelWithRecovery(HistoryLogLevel.AM, 1);
+    testLogLevelWithRecovery(HistoryLogLevel.DAG, 3);
+    testLogLevelWithRecovery(HistoryLogLevel.VERTEX, 4);
+    testLogLevelWithRecovery(HistoryLogLevel.TASK, 5);
+    testLogLevelWithRecovery(HistoryLogLevel.ALL, 6);
+  }
+
+  @Test
+  public void testMultipleDag() {
+    testLogLevel(null, HistoryLogLevel.NONE, 7);
+    testLogLevel(null, HistoryLogLevel.AM, 7);
+    testLogLevel(null, HistoryLogLevel.DAG, 9);
+    testLogLevel(null, HistoryLogLevel.VERTEX, 10);
+    testLogLevel(null, HistoryLogLevel.TASK, 11);
+    testLogLevel(null, HistoryLogLevel.ALL, 12);
+    testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.NONE, 5);
+    testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.AM, 5);
+    testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.DAG, 7);
+    testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.VERTEX, 8);
+    testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.TASK, 9);
+    testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.ALL, 10);
+    testLogLevel(HistoryLogLevel.NONE, HistoryLogLevel.NONE, 0);
+  }
+
+  private void testLogLevelWithRecovery(HistoryLogLevel level, int expectedCount) {
+    HistoryEventHandler handler = createHandler(level);
+    InMemoryHistoryLoggingService.events.clear();
+    TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+    List<DAGHistoryEvent> events = makeHistoryEvents(dagId, handler.getConfig());
+    events.set(1, new DAGHistoryEvent(dagId,
+        new DAGRecoveredEvent(attemptId, dagId, "test", user, 0, null)));
+    for (DAGHistoryEvent event : events) {
+      handler.handle(event);
+    }
+    assertEquals("Failed for level: " + level,
+        expectedCount, InMemoryHistoryLoggingService.events.size());
+    handler.stop();
+  }
+
+  private void testLogLevel(HistoryLogLevel level, int expectedCount) {
+    HistoryEventHandler handler = createHandler(level);
+    InMemoryHistoryLoggingService.events.clear();
+    TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId, handler.getConfig())) {
+      handler.handle(event);
+    }
+    assertEquals("Failed for level: " + level,
+        expectedCount, InMemoryHistoryLoggingService.events.size());
+    handler.stop();
+  }
+
+  private void testLogLevel(HistoryLogLevel defaultLogLevel, HistoryLogLevel dagLogLevel,
+      int expectedCount) {
+    HistoryEventHandler handler = createHandler(defaultLogLevel);
+    InMemoryHistoryLoggingService.events.clear();
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 1);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, handler.getConfig())) {
+      handler.handle(event);
+    }
+    TezDAGID dagId2 = TezDAGID.getInstance(appId, 2);
+    Configuration conf = new Configuration(handler.getConfig());
+    conf.setEnum(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL, dagLogLevel);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId2, conf)) {
+      handler.handle(event);
+    }
+
+    assertEquals(expectedCount, InMemoryHistoryLoggingService.events.size());
+    handler.stop();
+  }
+
+  public static class InMemoryHistoryLoggingService extends HistoryLoggingService {
+    public InMemoryHistoryLoggingService() {
+      super("InMemoryHistoryLoggingService");
+    }
+    static List<DAGHistoryEvent> events = new ArrayList<>();
+    @Override
+    public void handle(DAGHistoryEvent event) {
+      events.add(event);
+    }
+  }
+
+  private HistoryEventHandler createHandler(HistoryLogLevel logLevel) {
+    Configuration conf = new Configuration(false);
+    conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false);
+    conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+        InMemoryHistoryLoggingService.class.getName());
+    if (logLevel != null) {
+      conf.setEnum(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL, logLevel);
+    }
+
+    DAG dag = mock(DAG.class);
+    when(dag.getConf()).thenReturn(conf);
+
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getApplicationID()).thenReturn(appId);
+    when(appContext.getHadoopShim()).thenReturn(new HadoopShim() {});
+    when(appContext.getAMConf()).thenReturn(conf);
+    when(appContext.getCurrentDAG()).thenReturn(dag);
+
+    HistoryEventHandler handler =  new HistoryEventHandler(appContext);
+    handler.init(conf);
+
+    return handler;
+  }
+
+  private List<DAGHistoryEvent> makeHistoryEvents(TezDAGID dagId, Configuration inConf)
{
+    List<DAGHistoryEvent> historyEvents = new ArrayList<>();
+
+    long time = System.currentTimeMillis();
+    Configuration conf = new Configuration(inConf);
+    historyEvents.add(new DAGHistoryEvent(null,
+        new AMStartedEvent(attemptId, time, user)));
+    historyEvents.add(new DAGHistoryEvent(dagId,
+        new DAGSubmittedEvent(dagId, time, DAGPlan.getDefaultInstance(), attemptId, null,
user,
+            conf, null)));
+    TezVertexID vertexID = TezVertexID.getInstance(dagId, 1);
+    historyEvents.add(new DAGHistoryEvent(dagId,
+        new VertexStartedEvent(vertexID, time, time)));
+    TezTaskID tezTaskID = TezTaskID.getInstance(vertexID, 1);
+    historyEvents.add(new DAGHistoryEvent(dagId,
+        new TaskStartedEvent(tezTaskID, "test", time, time)));
+    historyEvents.add(new DAGHistoryEvent(dagId,
+        new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(tezTaskID, 1), "test", time,
+            ContainerId.newContainerId(attemptId, 1), NodeId.newInstance("localhost", 8765),
null,
+            null, null)));
+    historyEvents.add(new DAGHistoryEvent(dagId,
+        new DAGFinishedEvent(dagId, time, time, DAGState.SUCCEEDED, null, null, user, "test",
null,
+            attemptId, DAGPlan.getDefaultInstance())));
+    return historyEvents;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/cbc0c637/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java
b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java
index 6f70bf5..6f653ad 100644
--- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java
+++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java
@@ -19,20 +19,12 @@
 package org.apache.tez.dag.history.ats.acls;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.Random;
 
-import javax.ws.rs.core.MediaType;
-
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -46,26 +38,18 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.client.TezClient;
-import org.apache.tez.common.ReflectionUtils;
-import org.apache.tez.common.security.DAGAccessControls;
+import org.apache.tez.dag.api.HistoryLogLevel;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.dag.history.events.DAGSubmittedEvent;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.runtime.library.processor.SleepProcessor;
@@ -76,15 +60,7 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import com.google.common.collect.Sets;
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
-
-import org.mockito.Matchers;
-
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.when;
 
 public class TestATSHistoryV15 {
@@ -103,8 +79,6 @@ public class TestATSHistoryV15 {
       + TestATSHistoryV15.class.getName() + "-tmpDir";
   private static Path atsActivePath;
 
-  private static String user;
-
   @BeforeClass
   public static void setup() throws IOException {
     try {
@@ -144,7 +118,6 @@ public class TestATSHistoryV15 {
         LOG.info("Failed to start Mini Tez Cluster", e);
       }
     }
-    user = UserGroupInformation.getCurrentUser().getShortUserName();
     timelineAddress = mrrTezCluster.getConfig().get(
         YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS);
     if (timelineAddress != null) {
@@ -167,7 +140,7 @@ public class TestATSHistoryV15 {
     }
   }
 
-  @Test (timeout=50000)
+  @Test(timeout=50000)
   public void testSimpleDAG() throws Exception {
     TezClient tezSession = null;
     ApplicationId applicationId;
@@ -211,24 +184,77 @@ public class TestATSHistoryV15 {
       assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
 
       // Verify HDFS data
-      int count = verifyATSDataOnHDFS(atsActivePath, 0, applicationId);
-      Assert.assertTrue("Count is: " + count, count > 0);
-
+      int count = verifyATSDataOnHDFS(atsActivePath, applicationId);
+      Assert.assertEquals("Count is: " + count, 2, count);
     } finally {
       if (tezSession != null) {
         tezSession.stop();
       }
     }
+  }
+
+  @Test
+  public void testATSLogLevelNone() throws Exception {
+    TezClient tezSession = null;
+    ApplicationId applicationId;
+    String viewAcls = "nobody nobody_group";
+    try {
+      SleepProcessorConfig spConf = new SleepProcessorConfig(1);
+
+      DAG dag = DAG.create("TezSleepProcessor");
+      Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
+              SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+          Resource.newInstance(256, 1));
+      dag.addVertex(vertex);
+
+      TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
 
+      tezConf.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES,
+          "TEZ_DAG_ID");
+
+      tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls);
+      tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+          ATSV15HistoryLoggingService.class.getName());
+      Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
+          .nextInt(100000))));
+      remoteFs.mkdirs(remoteStagingDir);
+      tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
+
+      tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
+      tezSession.start();
+
+      applicationId = tezSession.getAppMasterApplicationId();
+      dag.setHistoryLogLevel(HistoryLogLevel.NONE);
+
+      DAGClient dagClient = tezSession.submitDAG(dag);
+
+      DAGStatus dagStatus = dagClient.getDAGStatus(null);
+      while (!dagStatus.isCompleted()) {
+        LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+            + dagStatus.getState());
+        Thread.sleep(500l);
+        dagStatus = dagClient.getDAGStatus(null);
+      }
+      assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+
+      // Verify HDFS data
+      int count = verifyATSDataOnHDFS(atsActivePath, applicationId);
+      Assert.assertEquals("Count is: " + count, 1, count);
+    } finally {
+      if (tezSession != null) {
+        tezSession.stop();
+      }
+    }
   }
 
-  private int verifyATSDataOnHDFS(Path p, int count, ApplicationId applicationId) throws
IOException {
+  private int verifyATSDataOnHDFS(Path p, ApplicationId applicationId) throws IOException
{
+    int count = 0;
     RemoteIterator<LocatedFileStatus> iter = remoteFs.listFiles(p, true);
     while (iter.hasNext()) {
       LocatedFileStatus f = iter.next();
       LOG.info("Found file " + f.toString());
       if (f.isDirectory()) {
-        verifyATSDataOnHDFS(f.getPath(), count, applicationId);
+        count += verifyATSDataOnHDFS(f.getPath(), applicationId);
       } else {
         if (f.getPath().getName().contains(
             "" + applicationId.getClusterTimestamp() + "_" + applicationId.getId())) {
@@ -240,7 +266,7 @@ public class TestATSHistoryV15 {
   }
 
   @Test
-  public void testGetGroupId() {
+  public void testGetGroupId() throws Exception {
     ApplicationId appId = ApplicationId.newInstance(1000l, 1);
     TezDAGID dagid = TezDAGID.getInstance(appId, 1);
     for (final HistoryEventType eventType : HistoryEventType.values()) {
@@ -290,8 +316,8 @@ public class TestATSHistoryV15 {
         default:
           Assert.assertEquals(dagid.toString(), grpId.getTimelineEntityGroupId());
       }
+      service.close();
     }
   }
 
-
 }


Mime
View raw message