hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cur...@apache.org
Subject [28/49] hadoop git commit: YARN-6736. Consider writing to both ats v1 & v2 from RM for smoother upgrades. Contributed by Aaron Gresch.
Date Thu, 18 Jan 2018 23:47:59 GMT
YARN-6736. Consider writing to both ats v1 & v2 from RM for smoother upgrades. Contributed by Aaron Gresch.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d09058b2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d09058b2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d09058b2

Branch: refs/heads/YARN-7402
Commit: d09058b2fd18803d12f0835fdf78aef5e0b99c90
Parents: a0c71dc
Author: Rohith Sharma K S <rohithsharmaks@apache.org>
Authored: Tue Jan 16 07:58:29 2018 +0530
Committer: Rohith Sharma K S <rohithsharmaks@apache.org>
Committed: Tue Jan 16 07:58:29 2018 +0530

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |  61 ++-
 .../distributedshell/ApplicationMaster.java     |  48 +-
 .../distributedshell/TestDSAppMaster.java       |  86 +++-
 .../yarn/client/api/impl/YarnClientImpl.java    |   8 +-
 .../client/api/impl/TimelineClientImpl.java     |   6 +-
 .../client/api/impl/TimelineV2ClientImpl.java   |   3 +-
 .../yarn/util/timeline/TimelineUtils.java       |   3 +-
 .../server/resourcemanager/ResourceManager.java |  44 +-
 .../metrics/CombinedSystemMetricsPublisher.java | 108 +++++
 .../resourcemanager/TestRMTimelineService.java  | 122 +++++
 .../TestCombinedSystemMetricsPublisher.java     | 476 +++++++++++++++++++
 .../hadoop/yarn/server/MiniYARNCluster.java     |  15 +-
 12 files changed, 912 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d09058b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index c892cfb..271b666 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -19,7 +19,9 @@
 package org.apache.hadoop.yarn.conf;
 
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
@@ -2271,6 +2273,9 @@ public class YarnConfiguration extends Configuration {
       + "version";
   public static final float DEFAULT_TIMELINE_SERVICE_VERSION = 1.0f;
 
+  public static final String TIMELINE_SERVICE_VERSIONS =
+      TIMELINE_SERVICE_PREFIX + "versions";
+
   /**
    * Comma separated list of names for UIs hosted in the timeline server
    * (For pluggable UIs).
@@ -3636,8 +3641,60 @@ public class YarnConfiguration extends Configuration {
    * version greater than equal to 2 but smaller than 3.
    */
   public static boolean timelineServiceV2Enabled(Configuration conf) {
-    return timelineServiceEnabled(conf) &&
-        (int)getTimelineServiceVersion(conf) == 2;
+    boolean enabled = false;
+    if (timelineServiceEnabled(conf)) {
+      Collection<Float> versions = getTimelineServiceVersions(conf);
+      for (Float version : versions) {
+        if (version.intValue() == 2) {
+          enabled = true;
+          break;
+        }
+      }
+    }
+    return enabled;
+  }
+
+  /**
+   * Returns whether the timeline service v.1 is enabled via configuration.
+   *
+   * @param conf the configuration
+   * @return whether the timeline service v.1 is enabled. V.1 refers to a
+   * version greater than equal to 1 but smaller than 2.
+   */
+  public static boolean timelineServiceV1Enabled(Configuration conf) {
+    boolean enabled = false;
+    if (timelineServiceEnabled(conf)) {
+      Collection<Float> versions = getTimelineServiceVersions(conf);
+      for (Float version : versions) {
+        if (version.intValue() == 1) {
+          enabled = true;
+          break;
+        }
+      }
+    }
+    return enabled;
+  }
+
+  /**
+   * Returns all the active timeline service versions. It does not check
+   * whether the timeline service itself is enabled.
+   *
+   * @param conf the configuration
+   * @return the timeline service versions as a collection of floats.
+   */
+  private static Collection<Float> getTimelineServiceVersions(
+      Configuration conf) {
+    String versions = conf.get(TIMELINE_SERVICE_VERSIONS);
+    if (versions == null) {
+      versions = Float.toString(getTimelineServiceVersion(conf));
+    }
+    List<String> stringList = Arrays.asList(versions.split(","));
+    List<Float> floatList = new ArrayList<Float>();
+    for (String s : stringList) {
+      Float f = Float.parseFloat(s);
+      floatList.add(f);
+    }
+    return floatList;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d09058b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index b35a2c9..bd810c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -323,7 +323,8 @@ public class ApplicationMaster {
   TimelineClient timelineClient;
 
   // Timeline v2 Client
-  private TimelineV2Client timelineV2Client;
+  @VisibleForTesting
+  TimelineV2Client timelineV2Client;
 
   static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS";
   static final String APPID_TIMELINE_FILTER_NAME = "appId";
@@ -632,11 +633,7 @@ public class ApplicationMaster {
     containrRetryInterval = Integer.parseInt(cliParser.getOptionValue(
         "container_retry_interval", "0"));
 
-    if (YarnConfiguration.timelineServiceEnabled(conf)) {
-      timelineServiceV2Enabled =
-          ((int) YarnConfiguration.getTimelineServiceVersion(conf) == 2);
-      timelineServiceV1Enabled = !timelineServiceV2Enabled;
-    } else {
+    if (!YarnConfiguration.timelineServiceEnabled(conf)) {
       timelineClient = null;
       timelineV2Client = null;
       LOG.warn("Timeline service is not enabled");
@@ -704,12 +701,11 @@ public class ApplicationMaster {
     if (timelineServiceV2Enabled) {
       // need to bind timelineClient
       amRMClient.registerTimelineV2Client(timelineV2Client);
-    }
-
-    if (timelineServiceV2Enabled) {
       publishApplicationAttemptEventOnTimelineServiceV2(
           DSEvent.DS_APP_ATTEMPT_START);
-    } else if (timelineServiceV1Enabled) {
+    }
+
+    if (timelineServiceV1Enabled) {
       publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
           DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
     }
@@ -784,18 +780,23 @@ public class ApplicationMaster {
         @Override
         public Void run() throws Exception {
           if (YarnConfiguration.timelineServiceEnabled(conf)) {
+            timelineServiceV1Enabled =
+                YarnConfiguration.timelineServiceV1Enabled(conf);
+            timelineServiceV2Enabled =
+                YarnConfiguration.timelineServiceV2Enabled(conf);
             // Creating the Timeline Client
+            if (timelineServiceV1Enabled) {
+              timelineClient = TimelineClient.createTimelineClient();
+              timelineClient.init(conf);
+              timelineClient.start();
+              LOG.info("Timeline service V1 client is enabled");
+            }
             if (timelineServiceV2Enabled) {
               timelineV2Client = TimelineV2Client.createTimelineClient(
                   appAttemptID.getApplicationId());
               timelineV2Client.init(conf);
               timelineV2Client.start();
               LOG.info("Timeline service V2 client is enabled");
-            } else {
-              timelineClient = TimelineClient.createTimelineClient();
-              timelineClient.init(conf);
-              timelineClient.start();
-              LOG.info("Timeline service V1 client is enabled");
             }
           } else {
             timelineClient = null;
@@ -825,12 +826,14 @@ public class ApplicationMaster {
       } catch (InterruptedException ex) {}
     }
 
+    if (timelineServiceV1Enabled) {
+      publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
+          DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
+    }
+
     if (timelineServiceV2Enabled) {
       publishApplicationAttemptEventOnTimelineServiceV2(
           DSEvent.DS_APP_ATTEMPT_END);
-    } else if (timelineServiceV1Enabled) {
-      publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
-          DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
     }
 
     // Join all launched threads
@@ -881,7 +884,8 @@ public class ApplicationMaster {
     // Stop Timeline Client
     if(timelineServiceV1Enabled) {
       timelineClient.stop();
-    } else if (timelineServiceV2Enabled) {
+    }
+    if (timelineServiceV2Enabled) {
       timelineV2Client.stop();
     }
 
@@ -947,7 +951,8 @@ public class ApplicationMaster {
           }
           publishContainerEndEventOnTimelineServiceV2(containerStatus,
               containerStartTime);
-        } else if (timelineServiceV1Enabled) {
+        }
+        if (timelineServiceV1Enabled) {
           publishContainerEndEvent(timelineClient, containerStatus, domainId,
               appSubmitterUgi);
         }
@@ -1113,7 +1118,8 @@ public class ApplicationMaster {
         applicationMaster.getContainerStartTimes().put(containerId, startTime);
         applicationMaster.publishContainerStartEventOnTimelineServiceV2(
             container, startTime);
-      } else if (applicationMaster.timelineServiceV1Enabled) {
+      }
+      if (applicationMaster.timelineServiceV1Enabled) {
         applicationMaster.publishContainerStartEvent(
             applicationMaster.timelineClient, container,
             applicationMaster.domainId, applicationMaster.appSubmitterUgi);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d09058b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
index 2789d04..f11bdf8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.applications.distributedshell;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -33,6 +35,8 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.Assert;
 import org.junit.Test;
@@ -167,14 +171,82 @@ public class TestDSAppMaster {
   }
 
   @Test
-  public void testTimelineClientInDSAppMaster() throws Exception {
+  public void testTimelineClientInDSAppMasterV1() throws Exception {
+    runTimelineClientInDSAppMaster(true, false);
+  }
+
+  @Test
+  public void testTimelineClientInDSAppMasterV2() throws Exception {
+    runTimelineClientInDSAppMaster(false, true);
+  }
+
+  @Test
+  public void testTimelineClientInDSAppMasterV1V2() throws Exception {
+    runTimelineClientInDSAppMaster(true, true);
+  }
+
+  @Test
+  public void testTimelineClientInDSAppMasterDisabled() throws Exception {
+    runTimelineClientInDSAppMaster(false, false);
+  }
+
+  private void runTimelineClientInDSAppMaster(boolean v1Enabled,
+      boolean v2Enabled) throws Exception {
+    ApplicationMaster appMaster = createAppMasterWithStartedTimelineService(
+        v1Enabled, v2Enabled);
+    validateAppMasterTimelineService(v1Enabled, v2Enabled, appMaster);
+  }
+
+  private void validateAppMasterTimelineService(boolean v1Enabled,
+      boolean v2Enabled, ApplicationMaster appMaster) {
+    if (v1Enabled) {
+      Assert.assertEquals(appMaster.appSubmitterUgi,
+          ((TimelineClientImpl)appMaster.timelineClient).getUgi());
+    } else {
+      Assert.assertNull(appMaster.timelineClient);
+    }
+    if (v2Enabled) {
+      Assert.assertNotNull(appMaster.timelineV2Client);
+    } else {
+      Assert.assertNull(appMaster.timelineV2Client);
+    }
+  }
+
+  private ApplicationMaster createAppMasterWithStartedTimelineService(
+      boolean v1Enabled, boolean v2Enabled) throws Exception {
     ApplicationMaster appMaster = new ApplicationMaster();
-    appMaster.appSubmitterUgi =
-        UserGroupInformation.createUserForTesting("foo", new String[]{"bar"});
-    Configuration conf = new YarnConfiguration();
-    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    appMaster.appSubmitterUgi = UserGroupInformation
+        .createUserForTesting("foo", new String[] {"bar"});
+    Configuration conf = this.getTimelineServiceConf(v1Enabled, v2Enabled);
+    ApplicationId appId = ApplicationId.newInstance(1L, 1);
+    appMaster.appAttemptID = ApplicationAttemptId.newInstance(appId, 1);
     appMaster.startTimelineClient(conf);
-    Assert.assertEquals(appMaster.appSubmitterUgi,
-        ((TimelineClientImpl)appMaster.timelineClient).getUgi());
+    return appMaster;
+  }
+
+  private Configuration getTimelineServiceConf(boolean v1Enabled,
+      boolean v2Enabled) {
+    Configuration conf = new YarnConfiguration(new Configuration(false));
+    Assert.assertFalse(YarnConfiguration.timelineServiceEnabled(conf));
+
+    if (v1Enabled || v2Enabled) {
+      conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    }
+
+    if (v1Enabled) {
+      conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
+    }
+
+    if (v2Enabled) {
+      conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+      conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
+          FileSystemTimelineWriterImpl.class, TimelineWriter.class);
+    }
+
+    if (v1Enabled && v2Enabled) {
+      conf.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "1.0");
+      conf.set(YarnConfiguration.TIMELINE_SERVICE_VERSIONS, "1.0,2.0f");
+    }
+    return conf;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d09058b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
index 9a9978d..072e606 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
@@ -177,13 +177,7 @@ public class YarnClientImpl extends YarnClient {
         YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
     }
 
-    float timelineServiceVersion =
-        conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
-            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
-    if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
-        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
-        && ((Float.compare(timelineServiceVersion, 1.0f) == 0)
-            || (Float.compare(timelineServiceVersion, 1.5f) == 0))) {
+    if (YarnConfiguration.timelineServiceV1Enabled(conf)) {
       timelineV1ServiceEnabled = true;
       timelineDTRenewer = getTimelineDelegationTokenRenewer(conf);
       timelineService = TimelineUtils.buildTimelineTokenService(conf);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d09058b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
index f49618b..44d6d48 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
@@ -99,14 +99,12 @@ public class TimelineClientImpl extends TimelineClient {
     timelineServiceVersion =
         conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
             YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
-    LOG.info("Timeline service address: " + getTimelineServiceAddress());
-    if (!YarnConfiguration.timelineServiceEnabled(conf)
-        || !((Float.compare(this.timelineServiceVersion, 1.0f) == 0)
-            || (Float.compare(this.timelineServiceVersion, 1.5f) == 0))) {
+    if (!YarnConfiguration.timelineServiceV1Enabled(conf)) {
       throw new IOException("Timeline V1 client is not properly configured. "
           + "Either timeline service is not enabled or version is not set to"
           + " 1.x");
     }
+    LOG.info("Timeline service address: " + getTimelineServiceAddress());
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     UserGroupInformation realUgi = ugi.getRealUser();
     if (realUgi != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d09058b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java
index 220d6af..02c9519 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java
@@ -94,8 +94,7 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
   }
 
   protected void serviceInit(Configuration conf) throws Exception {
-    if (!YarnConfiguration.timelineServiceEnabled(conf)
-        || (int) YarnConfiguration.getTimelineServiceVersion(conf) != 2) {
+    if (!YarnConfiguration.timelineServiceV2Enabled(conf)) {
       throw new IOException("Timeline V2 client is not properly configured. "
           + "Either timeline service is not enabled or version is not set to"
           + " 2");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d09058b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
index 3b12f3c..a0c4b72 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
@@ -118,7 +118,8 @@ public class TimelineUtils {
   }
 
   /**
-   * Returns whether the timeline service v.1.5 is enabled via configuration.
+   * Returns whether the timeline service v.1.5 is enabled by default via
+   * configuration.
    *
    * @param conf the configuration
    * @return whether the timeline service v.1.5 is enabled. V.1.5 refers to a

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d09058b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index a0317f6..8641842 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.metrics.NoOpSystemMetricPub
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Publisher;
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Publisher;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.CombinedSystemMetricsPublisher;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
@@ -513,26 +514,33 @@ public class ResourceManager extends CompositeService implements Recoverable {
   }
 
   protected SystemMetricsPublisher createSystemMetricsPublisher() {
-    SystemMetricsPublisher publisher;
-    if (YarnConfiguration.timelineServiceEnabled(conf) &&
-        YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
-      if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
-        // we're dealing with the v.2.x publisher
-        LOG.info("system metrics publisher with the timeline service V2 is " +
-            "configured");
-        publisher = new TimelineServiceV2Publisher(
-            rmContext.getRMTimelineCollectorManager());
-      } else {
-        // we're dealing with the v.1.x publisher
-        LOG.info("system metrics publisher with the timeline service V1 is " +
-            "configured");
-        publisher = new TimelineServiceV1Publisher();
-      }
-    } else {
+    List<SystemMetricsPublisher> publishers =
+        new ArrayList<SystemMetricsPublisher>();
+    if (YarnConfiguration.timelineServiceV1Enabled(conf)) {
+      SystemMetricsPublisher publisherV1 = new TimelineServiceV1Publisher();
+      publishers.add(publisherV1);
+    }
+    if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
+      // we're dealing with the v.2.x publisher
+      LOG.info("system metrics publisher with the timeline service V2 is "
+          + "configured");
+      SystemMetricsPublisher publisherV2 = new TimelineServiceV2Publisher(
+          rmContext.getRMTimelineCollectorManager());
+      publishers.add(publisherV2);
+    }
+    if (publishers.isEmpty()) {
       LOG.info("TimelineServicePublisher is not configured");
-      publisher = new NoOpSystemMetricPublisher();
+      SystemMetricsPublisher noopPublisher = new NoOpSystemMetricPublisher();
+      publishers.add(noopPublisher);
+    }
+
+    for (SystemMetricsPublisher publisher : publishers) {
+      addIfService(publisher);
     }
-    return publisher;
+
+    SystemMetricsPublisher combinedPublisher =
+        new CombinedSystemMetricsPublisher(publishers);
+    return combinedPublisher;
   }
 
   // sanity check for configurations

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d09058b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/CombinedSystemMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/CombinedSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/CombinedSystemMetricsPublisher.java
new file mode 100644
index 0000000..9646747
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/CombinedSystemMetricsPublisher.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.metrics;
+
+import java.util.Collection;
+
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+
+/**
+ * A metrics publisher that can publish for a collection of publishers.
+ */
+public class CombinedSystemMetricsPublisher implements SystemMetricsPublisher {
+  private Collection<SystemMetricsPublisher> publishers;
+
+  public CombinedSystemMetricsPublisher(Collection<SystemMetricsPublisher>
+      publishers) {
+    this.publishers = publishers;
+  }
+
+  @Override
+  public void appCreated(RMApp app, long createdTime) {
+    for (SystemMetricsPublisher publisher : this.publishers) {
+      publisher.appCreated(app, createdTime);
+    }
+  }
+
+  @Override
+  public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) {
+    for (SystemMetricsPublisher publisher : this.publishers) {
+      publisher.appACLsUpdated(app, appViewACLs, updatedTime);
+    }
+  }
+
+  @Override
+  public void appUpdated(RMApp app, long updatedTime) {
+    for (SystemMetricsPublisher publisher : this.publishers) {
+      publisher.appUpdated(app, updatedTime);
+    }
+  }
+
+  @Override
+  public void appStateUpdated(RMApp app, YarnApplicationState appState,
+      long updatedTime) {
+    for (SystemMetricsPublisher publisher : this.publishers) {
+      publisher.appStateUpdated(app, appState, updatedTime);
+    }
+  }
+
+  @Override
+  public void appFinished(RMApp app, RMAppState state, long finishedTime) {
+    for (SystemMetricsPublisher publisher : this.publishers) {
+      publisher.appFinished(app, state, finishedTime);
+    }
+  }
+
+  @Override
+  public void appAttemptRegistered(RMAppAttempt appAttempt,
+      long registeredTime) {
+    for (SystemMetricsPublisher publisher : this.publishers) {
+      publisher.appAttemptRegistered(appAttempt, registeredTime);
+    }
+  }
+
+  @Override
+  public void appAttemptFinished(RMAppAttempt appAttempt,
+      RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
+    for (SystemMetricsPublisher publisher : this.publishers) {
+      publisher.appAttemptFinished(appAttempt, appAttemtpState, app,
+          finishedTime);
+    }
+  }
+
+  @Override
+  public void containerCreated(RMContainer container, long createdTime) {
+    for (SystemMetricsPublisher publisher : this.publishers) {
+      publisher.containerCreated(container, createdTime);
+    }
+  }
+
+  @Override
+  public void containerFinished(RMContainer container, long finishedTime) {
+    for (SystemMetricsPublisher publisher : this.publishers) {
+      publisher.containerFinished(container, finishedTime);
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d09058b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMTimelineService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMTimelineService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMTimelineService.java
new file mode 100644
index 0000000..f824fa1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMTimelineService.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Publisher;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Publisher;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests that the RM creates timeline services (v1/v2) as specified by the
+ * configuration.
+ */
+public class TestRMTimelineService {
+  private static MockRM rm;
+
+  private void setup(boolean v1Enabled, boolean v2Enabled) {
+    Configuration conf = new YarnConfiguration(new Configuration(false));
+    Assert.assertFalse(YarnConfiguration.timelineServiceEnabled(conf));
+
+    if (v1Enabled || v2Enabled) {
+      conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    }
+
+    if (v1Enabled) {
+      conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
+    }
+
+    if (v2Enabled) {
+      conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+      conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
+          FileSystemTimelineWriterImpl.class, TimelineWriter.class);
+    }
+
+    if (v1Enabled && v2Enabled) {
+      conf.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "1.0");
+      conf.set(YarnConfiguration.TIMELINE_SERVICE_VERSIONS, "1.0,2.0f");
+    }
+
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    rm = new MockRM(conf, memStore);
+    rm.start();
+  }
+
+  // validate RM services exist or not as we specified
+  private void validate(boolean v1Enabled, boolean v2Enabled) {
+    boolean v1PublisherServiceFound = false;
+    boolean v2PublisherServiceFound = false;
+    List<Service> services = rm.getServices();
+    for (Service service : services) {
+      if (service instanceof TimelineServiceV1Publisher) {
+        v1PublisherServiceFound = true;
+      } else if (service instanceof TimelineServiceV2Publisher) {
+        v2PublisherServiceFound = true;
+      }
+    }
+
+    Assert.assertEquals(v1Enabled, v1PublisherServiceFound);
+    Assert.assertEquals(v2Enabled, v2PublisherServiceFound);
+  }
+
+  private void cleanup() throws Exception {
+    rm.close();
+    rm.stop();
+  }
+
+  // runs test to validate RM creates a timeline service publisher if and
+  // only if the service is enabled for v1 and v2 (independently).
+  private void runTest(boolean v1Enabled, boolean v2Enabled) throws Exception {
+    setup(v1Enabled, v2Enabled);
+    validate(v1Enabled, v2Enabled);
+    cleanup();
+  }
+
+  @Test
+  public void testTimelineServiceV1V2Enabled() throws Exception {
+    runTest(true, true);
+  }
+
+  @Test
+  public void testTimelineServiceV1Enabled() throws Exception {
+    runTest(true, false);
+  }
+
+  @Test
+  public void testTimelineServiceV2Enabled() throws Exception {
+    runTest(false, true);
+  }
+
+  @Test
+  public void testTimelineServiceDisabled() throws Exception {
+    runTest(false, false);
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d09058b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestCombinedSystemMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestCombinedSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestCombinedSystemMetricsPublisher.java
new file mode 100644
index 0000000..830d01c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestCombinedSystemMetricsPublisher.java
@@ -0,0 +1,476 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.metrics;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer;
+import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
+import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore;
+import org.apache.hadoop.yarn.server.timeline.TimelineStore;
+import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
+import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
+import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
+import org.apache.hadoop.yarn.util.TimelineServiceHelper;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests that a CombinedSystemMetricsPublisher publishes metrics for timeline
+ * services (v1/v2) as specified by the configuration.
+ */
+public class TestCombinedSystemMetricsPublisher {
+  /**
+    * The folder where the FileSystemTimelineWriterImpl writes the entities.
+    */
+  private static File testRootDir = new File("target",
+      TestCombinedSystemMetricsPublisher.class.getName() + "-localDir")
+      .getAbsoluteFile();
+
+  private static ApplicationHistoryServer timelineServer;
+  private static CombinedSystemMetricsPublisher metricsPublisher;
+  private static TimelineStore store;
+  private static ConcurrentMap<ApplicationId, RMApp> rmAppsMapInContext;
+  private static RMTimelineCollectorManager rmTimelineCollectorManager;
+  private static DrainDispatcher dispatcher;
+  private static YarnConfiguration conf;
+  private static TimelineServiceV1Publisher publisherV1;
+  private static TimelineServiceV2Publisher publisherV2;
+  private static ApplicationAttemptId appAttemptId;
+  private static RMApp app;
+
+  private void testSetup(boolean enableV1, boolean enableV2) throws Exception {
+
+    if (testRootDir.exists()) {
+      //cleanup before hand
+      FileContext.getLocalFSFileContext().delete(
+              new Path(testRootDir.getAbsolutePath()), true);
+    }
+
+    conf = getConf(enableV1, enableV2);
+
+    RMContext rmContext = mock(RMContext.class);
+    rmAppsMapInContext = new ConcurrentHashMap<ApplicationId, RMApp>();
+    when(rmContext.getRMApps()).thenReturn(rmAppsMapInContext);
+    ResourceManager rm = mock(ResourceManager.class);
+    when(rm.getRMContext()).thenReturn(rmContext);
+
+    if (enableV2) {
+      dispatcher = new DrainDispatcher();
+      rmTimelineCollectorManager = new RMTimelineCollectorManager(rm);
+      when(rmContext.getRMTimelineCollectorManager()).thenReturn(
+          rmTimelineCollectorManager);
+
+      rmTimelineCollectorManager.init(conf);
+      rmTimelineCollectorManager.start();
+    } else {
+      dispatcher = null;
+      rmTimelineCollectorManager = null;
+    }
+
+    timelineServer = new ApplicationHistoryServer();
+    timelineServer.init(conf);
+    timelineServer.start();
+    store = timelineServer.getTimelineStore();
+
+    if (enableV2) {
+      dispatcher.init(conf);
+      dispatcher.start();
+    }
+
+    List<SystemMetricsPublisher> publishers =
+        new ArrayList<SystemMetricsPublisher>();
+
+    if (YarnConfiguration.timelineServiceV1Enabled(conf)) {
+      Assert.assertTrue(enableV1);
+      publisherV1 = new TimelineServiceV1Publisher();
+      publishers.add(publisherV1);
+      publisherV1.init(conf);
+      publisherV1.start();
+    } else {
+      Assert.assertFalse(enableV1);
+      publisherV1 = null;
+    }
+
+    if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
+      Assert.assertTrue(enableV2);
+      publisherV2 = new TimelineServiceV2Publisher(
+          rmTimelineCollectorManager) {
+        @Override
+        protected Dispatcher getDispatcher() {
+          return dispatcher;
+        }
+      };
+      publishers.add(publisherV2);
+      publisherV2.init(conf);
+      publisherV2.start();
+    } else {
+      Assert.assertFalse(enableV2);
+      publisherV2 = null;
+    }
+
+    if (publishers.isEmpty()) {
+      NoOpSystemMetricPublisher noopPublisher =
+          new NoOpSystemMetricPublisher();
+      publishers.add(noopPublisher);
+    }
+
+    metricsPublisher = new CombinedSystemMetricsPublisher(publishers);
+  }
+
+  private void testCleanup() throws Exception {
+    if (publisherV1 != null) {
+      publisherV1.stop();
+    }
+    if (publisherV2 != null) {
+      publisherV2.stop();
+    }
+    if (timelineServer != null) {
+      timelineServer.stop();
+    }
+    if (testRootDir.exists()) {
+      FileContext.getLocalFSFileContext().delete(
+          new Path(testRootDir.getAbsolutePath()), true);
+    }
+    if (rmTimelineCollectorManager != null) {
+      rmTimelineCollectorManager.stop();
+    }
+  }
+
+  private static YarnConfiguration getConf(boolean v1Enabled,
+      boolean v2Enabled) {
+    YarnConfiguration yarnConf = new YarnConfiguration();
+
+    if (v1Enabled || v2Enabled) {
+      yarnConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    } else {
+      yarnConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false);
+    }
+
+    if (v1Enabled) {
+      yarnConf.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "1.0");
+      yarnConf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
+          MemoryTimelineStore.class, TimelineStore.class);
+      yarnConf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS,
+          MemoryTimelineStateStore.class, TimelineStateStore.class);
+    }
+
+    if (v2Enabled) {
+      yarnConf.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "2.0");
+      yarnConf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
+          true);
+      yarnConf.setBoolean(
+          YarnConfiguration.RM_PUBLISH_CONTAINER_EVENTS_ENABLED, true);
+      yarnConf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
+          FileSystemTimelineWriterImpl.class, TimelineWriter.class);
+
+      try {
+        yarnConf.set(
+            FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
+                testRootDir.getCanonicalPath());
+      } catch (IOException e) {
+        e.printStackTrace();
+        Assert.fail("Exception while setting the " +
+            "TIMELINE_SERVICE_STORAGE_DIR_ROOT ");
+      }
+    }
+
+    if (v1Enabled && v2Enabled) {
+      yarnConf.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "1.0");
+      yarnConf.set(YarnConfiguration.TIMELINE_SERVICE_VERSIONS, "1.0,2.0f");
+    }
+
+    yarnConf.setInt(
+        YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, 2);
+
+    return yarnConf;
+  }
+
+  // runs test to validate timeline events are published if and only if the
+  // service is enabled for v1 and v2 (independently).
+  private void runTest(boolean v1Enabled, boolean v2Enabled) throws Exception {
+    testSetup(v1Enabled, v2Enabled);
+    publishEvents(v1Enabled, v2Enabled);
+    validateV1(v1Enabled);
+    validateV2(v2Enabled);
+    testCleanup();
+  }
+
+  @Test(timeout = 10000)
+  public void testTimelineServiceEventPublishingV1V2Enabled()
+      throws Exception {
+    runTest(true, true);
+  }
+
+  @Test(timeout = 10000)
+  public void testTimelineServiceEventPublishingV1Enabled() throws Exception {
+    runTest(true, false);
+  }
+
+  @Test(timeout = 10000)
+  public void testTimelineServiceEventPublishingV2Enabled() throws Exception {
+    runTest(false, true);
+  }
+
+  @Test(timeout = 10000)
+  public void testTimelineServiceEventPublishingNoService() throws Exception {
+    runTest(false, false);
+  }
+
+  private void publishEvents(boolean v1Enabled, boolean v2Enabled) {
+    long timestamp = (v1Enabled) ? 1 : 2;
+    int id = (v2Enabled) ? 3 : 4;
+    ApplicationId appId = ApplicationId.newInstance(timestamp, id);
+
+    app = createRMApp(appId);
+    rmAppsMapInContext.putIfAbsent(appId, app);
+
+    if (v2Enabled) {
+      AppLevelTimelineCollector collector =
+          new AppLevelTimelineCollector(appId);
+      rmTimelineCollectorManager.putIfAbsent(appId, collector);
+    }
+    appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    RMAppAttempt appAttempt = createRMAppAttempt(true);
+
+    metricsPublisher.appAttemptRegistered(appAttempt, Integer.MAX_VALUE + 1L);
+    metricsPublisher.appAttemptFinished(appAttempt, RMAppAttemptState.FINISHED,
+        app, Integer.MAX_VALUE + 2L);
+    if (v2Enabled) {
+      dispatcher.await();
+    }
+  }
+
+  private void validateV1(boolean v1Enabled) throws Exception {
+    TimelineEntity entity = null;
+
+    if (!v1Enabled) {
+      Thread.sleep(1000);
+      entity =
+          store.getEntity(appAttemptId.toString(),
+              AppAttemptMetricsConstants.ENTITY_TYPE,
+              EnumSet.allOf(Field.class));
+      Assert.assertNull(entity);
+      return;
+    }
+
+    do {
+      entity =
+          store.getEntity(appAttemptId.toString(),
+              AppAttemptMetricsConstants.ENTITY_TYPE,
+              EnumSet.allOf(Field.class));
+      Thread.sleep(100);
+      // ensure two events are both published before leaving the loop
+    } while (entity == null || entity.getEvents().size() < 2);
+
+    boolean hasRegisteredEvent = false;
+    boolean hasFinishedEvent = false;
+    for (org.apache.hadoop.yarn.api.records.timeline.TimelineEvent event :
+        entity.getEvents()) {
+      if (event.getEventType().equals(
+          AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE)) {
+        hasRegisteredEvent = true;
+      } else if (event.getEventType().equals(
+          AppAttemptMetricsConstants.FINISHED_EVENT_TYPE)) {
+        hasFinishedEvent = true;
+        Assert.assertEquals(
+            FinalApplicationStatus.UNDEFINED.toString(),
+            event.getEventInfo().get(
+                AppAttemptMetricsConstants.FINAL_STATUS_INFO));
+        Assert.assertEquals(
+            YarnApplicationAttemptState.FINISHED.toString(),
+            event.getEventInfo().get(
+                AppAttemptMetricsConstants.STATE_INFO));
+      }
+      Assert
+      .assertEquals(appAttemptId.toString(), entity.getEntityId());
+    }
+    Assert.assertTrue(hasRegisteredEvent && hasFinishedEvent);
+  }
+
+  private void validateV2(boolean v2Enabled) throws Exception {
+    String outputDirApp =
+        getTimelineEntityDir() + "/"
+            + TimelineEntityType.YARN_APPLICATION_ATTEMPT + "/";
+
+    File entityFolder = new File(outputDirApp);
+    Assert.assertEquals(v2Enabled, entityFolder.isDirectory());
+
+    if (v2Enabled) {
+      String timelineServiceFileName = appAttemptId.toString()
+          + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+      File entityFile = new File(outputDirApp, timelineServiceFileName);
+      Assert.assertTrue(entityFile.exists());
+      long idPrefix = TimelineServiceHelper
+          .invertLong(appAttemptId.getAttemptId());
+      verifyEntity(entityFile, 2,
+          AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, 0, idPrefix);
+    }
+  }
+
+  private void verifyEntity(File entityFile, long expectedEvents,
+      String eventForCreatedTime, long expectedMetrics, long idPrefix)
+          throws IOException {
+
+    BufferedReader reader = null;
+    String strLine;
+    long count = 0;
+    long metricsCount = 0;
+    try {
+      reader = new BufferedReader(new FileReader(entityFile));
+      while ((strLine = reader.readLine()) != null) {
+        if (strLine.trim().length() > 0) {
+          org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
+              entity = FileSystemTimelineReaderImpl
+              .getTimelineRecordFromJSON(strLine.trim(),
+                  org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.class);
+          metricsCount = entity.getMetrics().size();
+          assertEquals(idPrefix, entity.getIdPrefix());
+          for (TimelineEvent event : entity.getEvents()) {
+            if (event.getId().equals(eventForCreatedTime)) {
+              assertTrue(entity.getCreatedTime() > 0);
+              break;
+            }
+          }
+          count++;
+        }
+      }
+    } finally {
+      reader.close();
+    }
+    assertEquals("Expected " + expectedEvents + " events to be published",
+        expectedEvents, count);
+    assertEquals("Expected " + expectedMetrics + " metrics is incorrect",
+        expectedMetrics, metricsCount);
+  }
+
+  private String getTimelineEntityDir() {
+    String outputDirApp =
+        testRootDir.getAbsolutePath() + "/"
+            + FileSystemTimelineWriterImpl.ENTITIES_DIR + "/"
+            + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/"
+            + app.getUser() + "/"
+            + app.getName() + "/"
+            + TimelineUtils.DEFAULT_FLOW_VERSION + "/"
+            + app.getStartTime() + "/"
+            + app.getApplicationId();
+    return outputDirApp;
+  }
+
+  private static RMAppAttempt createRMAppAttempt(boolean unmanagedAMAttempt) {
+    RMAppAttempt appAttempt = mock(RMAppAttempt.class);
+    when(appAttempt.getAppAttemptId()).thenReturn(appAttemptId);
+    when(appAttempt.getHost()).thenReturn("test host");
+    when(appAttempt.getRpcPort()).thenReturn(-100);
+    if (!unmanagedAMAttempt) {
+      Container container = mock(Container.class);
+      when(container.getId())
+          .thenReturn(ContainerId.newContainerId(appAttemptId, 1));
+      when(appAttempt.getMasterContainer()).thenReturn(container);
+    }
+    when(appAttempt.getDiagnostics()).thenReturn("test diagnostics info");
+    when(appAttempt.getTrackingUrl()).thenReturn("test tracking url");
+    when(appAttempt.getOriginalTrackingUrl()).thenReturn(
+        "test original tracking url");
+    return appAttempt;
+  }
+
+  private static RMApp createRMApp(ApplicationId appId) {
+    RMApp rmApp = mock(RMAppImpl.class);
+    when(rmApp.getApplicationId()).thenReturn(appId);
+    when(rmApp.getName()).thenReturn("test app");
+    when(rmApp.getApplicationType()).thenReturn("test app type");
+    when(rmApp.getUser()).thenReturn("testUser");
+    when(rmApp.getQueue()).thenReturn("test queue");
+    when(rmApp.getSubmitTime()).thenReturn(Integer.MAX_VALUE + 1L);
+    when(rmApp.getStartTime()).thenReturn(Integer.MAX_VALUE + 2L);
+    when(rmApp.getFinishTime()).thenReturn(Integer.MAX_VALUE + 3L);
+    when(rmApp.getDiagnostics()).thenReturn(
+        new StringBuilder("test diagnostics info"));
+    RMAppAttempt appAttempt = mock(RMAppAttempt.class);
+    when(appAttempt.getAppAttemptId()).thenReturn(
+        ApplicationAttemptId.newInstance(appId, 1));
+    when(rmApp.getCurrentAppAttempt()).thenReturn(appAttempt);
+    when(rmApp.getFinalApplicationStatus()).thenReturn(
+        FinalApplicationStatus.UNDEFINED);
+    Map<String, Long> resourceMap = new HashMap<>();
+    resourceMap
+        .put(ResourceInformation.MEMORY_MB.getName(), (long) Integer.MAX_VALUE);
+    resourceMap.put(ResourceInformation.VCORES.getName(), Long.MAX_VALUE);
+    Map<String, Long> preemptedMap = new HashMap<>();
+    preemptedMap
+        .put(ResourceInformation.MEMORY_MB.getName(), (long) Integer.MAX_VALUE);
+    preemptedMap.put(ResourceInformation.VCORES.getName(), Long.MAX_VALUE);
+    when(rmApp.getRMAppMetrics()).thenReturn(
+        new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, resourceMap,
+            preemptedMap));
+    when(rmApp.getApplicationTags()).thenReturn(
+        Collections.<String> emptySet());
+    ApplicationSubmissionContext appSubmissionContext =
+        mock(ApplicationSubmissionContext.class);
+    when(appSubmissionContext.getPriority())
+        .thenReturn(Priority.newInstance(0));
+
+    ContainerLaunchContext containerLaunchContext =
+        mock(ContainerLaunchContext.class);
+    when(containerLaunchContext.getCommands())
+        .thenReturn(Collections.singletonList("java -Xmx1024m"));
+    when(appSubmissionContext.getAMContainerSpec())
+        .thenReturn(containerLaunchContext);
+    when(rmApp.getApplicationPriority()).thenReturn(Priority.newInstance(10));
+    when(rmApp.getApplicationSubmissionContext())
+        .thenReturn(appSubmissionContext);
+    return rmApp;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d09058b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
index 8245fd6..cbb69f2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
@@ -251,6 +251,15 @@ public class MiniYARNCluster extends CompositeService {
     useFixedPorts = conf.getBoolean(
         YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS,
         YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS);
+
+    if (!useFixedPorts) {
+      String hostname = MiniYARNCluster.getHostname();
+      conf.set(YarnConfiguration.TIMELINE_SERVICE_ADDRESS, hostname + ":0");
+
+      conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
+          hostname + ":" + ServerSocketUtil.getPort(9188, 10));
+    }
+
     useRpc = conf.getBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC,
         YarnConfiguration.DEFAULT_YARN_MINICLUSTER_USE_RPC);
     failoverTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
@@ -808,12 +817,6 @@ public class MiniYARNCluster extends CompositeService {
       }
       conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS,
           MemoryTimelineStateStore.class, TimelineStateStore.class);
-      if (!useFixedPorts) {
-        String hostname = MiniYARNCluster.getHostname();
-        conf.set(YarnConfiguration.TIMELINE_SERVICE_ADDRESS, hostname + ":0");
-        conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
-            hostname + ":" + ServerSocketUtil.getPort(9188, 10));
-      }
       appHistoryServer.init(conf);
       super.serviceInit(conf);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message