tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [01/35] tez git commit: TEZ-2205. Tez still tries to post to ATS when yarn.timeline-service.enabled=false. (Chang Li via hitesh)
Date Tue, 07 Apr 2015 20:12:19 GMT
Repository: tez
Updated Branches:
  refs/heads/TEZ-2003 7f91bd878 -> 7522e60b5 (forced update)


TEZ-2205. Tez still tries to post to ATS when yarn.timeline-service.enabled=false. (Chang
Li via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a4f6dbce
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a4f6dbce
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a4f6dbce

Branch: refs/heads/TEZ-2003
Commit: a4f6dbcecec8c084f10a0879a2960f140879618f
Parents: 765adfb
Author: Hitesh Shah <hitesh@apache.org>
Authored: Thu Mar 26 17:20:10 2015 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Thu Mar 26 17:20:10 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../ats/acls/ATSHistoryACLPolicyManager.java    | 24 ++++++++--
 .../ats/acls/TestATSHistoryWithACLs.java        | 16 +++++++
 .../logging/ats/ATSHistoryLoggingService.java   | 28 +++++++++--
 .../ats/TestATSHistoryLoggingService.java       | 50 ++++++++++++++++++++
 5 files changed, 112 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a4f6dbce/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1f07257..bd7f337 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -91,6 +91,7 @@ Release 0.6.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2205. Tez still tries to post to ATS when yarn.timeline-service.enabled=false.
   TEZ-2047. Build fails against hadoop-2.2 post TEZ-2018
   TEZ-2064. SessionNotRunning Exception not thrown is all cases
   TEZ-2189. Tez UI live AM tracking url only works for localhost addresses

http://git-wip-us.apache.org/repos/asf/tez/blob/a4f6dbce/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java
b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java
index 4b4487b..23d3558 100644
--- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java
+++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java
@@ -38,6 +38,7 @@ import org.apache.tez.common.security.ACLManager;
 import org.apache.tez.common.security.ACLType;
 import org.apache.tez.common.security.DAGAccessControls;
 import org.apache.tez.common.security.HistoryACLPolicyManager;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 
@@ -51,6 +52,8 @@ public class ATSHistoryACLPolicyManager implements HistoryACLPolicyManager
{
   Configuration conf;
   String user;
   final static String DOMAIN_ID_PREFIX = "Tez_ATS_";
+  private static final String atsHistoryLoggingServiceClassName =
+      "org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService";
 
   private void initializeTimelineClient() {
     if (this.conf == null) {
@@ -60,9 +63,20 @@ public class ATSHistoryACLPolicyManager implements HistoryACLPolicyManager
{
       this.timelineClient.stop();
       this.timelineClient = null;
     }
-    this.timelineClient = TimelineClient.createTimelineClient();
-    this.timelineClient.init(this.conf);
-    this.timelineClient.start();
+    if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
+      YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
+      this.timelineClient = TimelineClient.createTimelineClient();
+      this.timelineClient.init(this.conf);
+      this.timelineClient.start();
+    } else {
+      this.timelineClient = null;
+      if (conf.get(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, "")
+         .equals(atsHistoryLoggingServiceClassName)) {
+        LOG.warn(atsHistoryLoggingServiceClassName
+            + " is disabled due to Timeline Service being disabled, "
+            + YarnConfiguration.TIMELINE_SERVICE_ENABLED + " set to false");
+      }
+    }
     try {
       this.user = UserGroupInformation.getCurrentUser().getShortUserName();
     } catch (IOException e) {
@@ -110,7 +124,9 @@ public class ATSHistoryACLPolicyManager implements HistoryACLPolicyManager
{
     timelineDomain.setWriters(user);
 
     try {
-      timelineClient.putDomain(timelineDomain);
+      if (timelineClient != null) {
+        timelineClient.putDomain(timelineDomain);
+      }
     } catch (Exception e) {
       LOG.warn("Could not post timeline domain", e);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/a4f6dbce/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
index 30c6dda..c03ce27 100644
--- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
+++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.client.TezClient;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.security.DAGAccessControls;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -56,6 +57,7 @@ import org.apache.tez.runtime.library.processor.SleepProcessor;
 import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig;
 import org.apache.tez.tests.MiniTezClusterWithTimeline;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -295,6 +297,20 @@ public class TestATSHistoryWithACLs {
     verifyEntityDomains(applicationId, false);
   }
 
+  private static final String atsHistoryACLManagerClassName =
+      "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager";
+  @Test (timeout=50000)
+  public void testTimelineServiceDisabled() throws Exception {
+    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+    tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+        ATSHistoryLoggingService.class.getName());
+    tezConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,false);
+    ATSHistoryACLPolicyManager historyACLPolicyManager = ReflectionUtils.createClazzInstance(
+        atsHistoryACLManagerClassName);
+    historyACLPolicyManager.setConf(tezConf);
+    Assert.assertNull(historyACLPolicyManager.timelineClient);
+  }
+
   private void verifyEntityDomains(ApplicationId applicationId, boolean sameDomain) {
     assertNotNull(timelineAddress);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a4f6dbce/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
index 11f38b9..b2ac1bb 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelineP
 import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.security.HistoryACLPolicyManager;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -72,6 +73,8 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
   private long maxPollingTimeMillis;
 
   private String sessionDomainId;
+  private static final String atsHistoryLoggingServiceClassName =
+      "org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService";
   private static final String atsHistoryACLManagerClassName =
       "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager";
   private HistoryACLPolicyManager historyACLPolicyManager;
@@ -83,8 +86,19 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
   @Override
   public void serviceInit(Configuration conf) throws Exception {
     LOG.info("Initializing ATSService");
-    timelineClient = TimelineClient.createTimelineClient();
-    timelineClient.init(conf);
+    if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
+      YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
+      timelineClient = TimelineClient.createTimelineClient();
+      timelineClient.init(conf);
+    } else {
+      this.timelineClient = null;
+      if (conf.get(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, "")
+        .equals(atsHistoryLoggingServiceClassName)) {
+        LOG.warn(atsHistoryLoggingServiceClassName
+            + " is disabled due to Timeline Service being disabled, "
+            + YarnConfiguration.TIMELINE_SERVICE_ENABLED + " set to false");
+      }
+    }
     maxTimeToWaitOnShutdown = conf.getLong(
         TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS,
         TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS_DEFAULT);
@@ -118,6 +132,9 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
 
   @Override
   public void serviceStart() {
+    if (timelineClient == null) {
+      return;
+    }
     LOG.info("Starting ATSService");
     timelineClient.start();
 
@@ -169,6 +186,9 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
 
   @Override
   public void serviceStop() {
+    if (timelineClient == null) {
+      return;
+    }
     LOG.info("Stopping ATSService"
         + ", eventQueueBacklog=" + eventQueue.size());
     stopped.set(true);
@@ -233,7 +253,9 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
 
 
   public void handle(DAGHistoryEvent event) {
-    eventQueue.add(event);
+    if (timelineClient != null) {
+      eventQueue.add(event);
+    }
   }
 
   private boolean isValidEvent(DAGHistoryEvent event) {

http://git-wip-us.apache.org/repos/asf/tez/blob/a4f6dbce/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
index 7d1abbd..464864e 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.history.DAGHistoryEvent;
@@ -142,4 +143,53 @@ public class TestATSHistoryLoggingService {
     Assert.assertEquals(atsEntitiesCounter/2, atsInvokeCounter);
   }
 
+  @Test(timeout=20000)
+  public void testTimelineServiceDisable() throws Exception {
+    ATSHistoryLoggingService atsHistoryLoggingService1;
+    AppContext appContext1;
+    appContext1 = mock(AppContext.class);
+    atsHistoryLoggingService1 = new ATSHistoryLoggingService();
+
+    atsHistoryLoggingService1.setAppContext(appContext);
+    atsHistoryLoggingService1.timelineClient = mock(TimelineClient.class);
+    when(atsHistoryLoggingService1.timelineClient.putEntities(
+      Matchers.<TimelineEntity[]>anyVararg())).thenAnswer(
+      new Answer<Object>() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        ++atsInvokeCounter;
+        atsEntitiesCounter += invocation.getArguments().length;
+        try {
+          Thread.sleep(500l);
+        } catch (InterruptedException e) {
+          // do nothing
+        }
+        return null;
+      }
+    });
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false);
+    conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+      ATSHistoryLoggingService.class.getName());
+    atsHistoryLoggingService1.init(conf);
+    atsHistoryLoggingService1.start();
+    TezDAGID tezDAGID = TezDAGID.getInstance(
+         ApplicationId.newInstance(100l, 1), 1);
+    DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID,
+    new DAGStartedEvent(tezDAGID, 1001l, "user1", "dagName1"));
+    for (int i = 0; i < 100; ++i) {
+      atsHistoryLoggingService1.handle(historyEvent);
+    }
+
+    try {
+        Thread.sleep(1000l);
+    } catch (InterruptedException e) {
+        // Do nothing
+    }
+    LOG.info("ATS entitiesSent=" + atsEntitiesCounter
+         + ", timelineInvocations=" + atsInvokeCounter);
+    Assert.assertEquals(atsInvokeCounter, 0);
+    Assert.assertEquals(atsEntitiesCounter, 0);
+    Assert.assertNull(atsHistoryLoggingService1.timelineClient);
+    
+  }
 }


Mime
View raw message