hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asur...@apache.org
Subject [13/15] hadoop git commit: YARN-5646. Add documentation and update config parameter names for scheduling of OPPORTUNISTIC containers. (Konstantinos Karanasos via asuresh)
Date Fri, 06 Jan 2017 19:34:43 GMT
YARN-5646. Add documentation and update config parameter names for scheduling of OPPORTUNISTIC containers. (Konstantinos Karanasos via asuresh)

(cherry picked from commit 2273a74c1f3895163046cca09ff5e983df301d22)
(cherry picked from commit 9e17ffe599e12c59c48f4355de645c31a4735d04)

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2b4d3e85
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2b4d3e85
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2b4d3e85

Branch: refs/heads/branch-2
Commit: 2b4d3e8506a36cb21c347d0bb455be28537ec023
Parents: 3d3bb30
Author: Arun Suresh <asuresh@apache.org>
Authored: Fri Dec 16 08:14:34 2016 -0800
Committer: Arun Suresh <asuresh@apache.org>
Committed: Fri Jan 6 11:15:10 2017 -0800

 .../v2/app/rm/RMContainerAllocator.java         |   4 +-
 .../apache/hadoop/mapreduce/MRJobConfig.java    |   6 +-
 .../hadoop/mapred/TestMROpportunisticMaps.java  |   2 +-
 .../hadoop/yarn/conf/YarnConfiguration.java     |  59 ++---
 .../api/impl/TestDistributedScheduling.java     |   1 -
 .../src/main/resources/yarn-default.xml         |  28 +--
 .../yarn/server/nodemanager/NodeManager.java    |   2 +-
 .../nodemanager/amrmproxy/AMRMProxyService.java |   2 +-
 .../containermanager/ContainerManagerImpl.java  |   4 +-
 .../scheduler/ContainerScheduler.java           |   2 +-
 ...pportunisticContainerAllocatorAMService.java |  16 +-
 .../yarn/server/resourcemanager/MockRM.java     |   2 +-
 .../hadoop/yarn/server/MiniYARNCluster.java     |   8 +-
 .../site/markdown/OpportunisticContainers.md    | 225 +++++++++++++++++++
 14 files changed, 296 insertions(+), 65 deletions(-)

diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index d2299d0..45c2a0e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -231,8 +231,8 @@ public class RMContainerAllocator extends RMContainerRequestor
     // first attempt to contact RM.
     retrystartTime = System.currentTimeMillis();
-        conf.getInt(MRJobConfig.MR_NUM_OPPORTUNISTIC_MAPS_PER_100,
     LOG.info(this.scheduledRequests.getNumOpportunisticMapsPer100() +
         "% of the mappers will be scheduled using OPPORTUNISTIC containers");

diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index e046c66..140b9ff 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -988,9 +988,9 @@ public interface MRJobConfig {
    * requested by the AM will be opportunistic. If the total number of maps
    * for the job is less than 'x', then ALL maps will be OPPORTUNISTIC
-  public static final String MR_NUM_OPPORTUNISTIC_MAPS_PER_100 =
-      "mapreduce.job.num-opportunistic-maps-per-100";
-  public static final int DEFAULT_MR_NUM_OPPORTUNISTIC_MAPS_PER_100 = 0;
+  public static final String MR_NUM_OPPORTUNISTIC_MAPS_PERCENTAGE =
+      "mapreduce.job.num-opportunistic-maps-percentage";
    * A comma-separated list of properties whose value will be redacted.

diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java
index d975fd0..462ff04 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java
@@ -145,7 +145,7 @@ public class TestMROpportunisticMaps {
-    job.setInt(MRJobConfig.MR_NUM_OPPORTUNISTIC_MAPS_PER_100, percent);
+    job.setInt(MRJobConfig.MR_NUM_OPPORTUNISTIC_MAPS_PERCENTAGE, percent);
     job.setInt("mapreduce.map.maxattempts", 1);
     job.setInt("mapreduce.reduce.maxattempts", 1);
     job.setInt("mapred.test.num_lines", numLines);

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 7a3ecf2..5245d5a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -297,69 +297,65 @@ public class YarnConfiguration extends Configuration {
   /** ACL used in case none is found. Allows nothing. */
   public static final String DEFAULT_YARN_APP_ACL = " ";
-  /** Setting that controls whether distributed scheduling is enabled or not. */
-  public static final String DIST_SCHEDULING_ENABLED =
-      YARN_PREFIX + "distributed-scheduling.enabled";
-  public static final boolean DIST_SCHEDULING_ENABLED_DEFAULT = false;
   /** Setting that controls whether opportunistic container allocation
    *  is enabled or not. */
-      YARN_PREFIX + "opportunistic-container-allocation.enabled";
+      RM_PREFIX + "opportunistic-container-allocation.enabled";
   public static final boolean
   /** Number of nodes to be used by the Opportunistic Container allocator for
    * dispatching containers during container allocation. */
-      YARN_PREFIX + "opportunistic-container-allocation.nodes-used";
+      RM_PREFIX + "opportunistic-container-allocation.nodes-used";
   /** Frequency for computing least loaded NMs. */
-      YARN_PREFIX + "nm-container-queuing.sorting-nodes-interval-ms";
+      RM_PREFIX + "nm-container-queuing.sorting-nodes-interval-ms";
   public static final long
-  /** Comparator for determining node load for Distributed Scheduling. */
+  /** Comparator for determining node load for scheduling of opportunistic
+   * containers. */
   public static final String NM_CONTAINER_QUEUING_LOAD_COMPARATOR =
-      YARN_PREFIX + "nm-container-queuing.load-comparator";
+      RM_PREFIX + "nm-container-queuing.load-comparator";
   /** Value of standard deviation used for calculation of queue limit
    * thresholds. */
   public static final String NM_CONTAINER_QUEUING_LIMIT_STDEV =
-      YARN_PREFIX + "nm-container-queuing.queue-limit-stdev";
-  public static final float NM_CONTAINER_QUEUING_LIMIT_STDEV_DEFAULT =
+      RM_PREFIX + "nm-container-queuing.queue-limit-stdev";
+  public static final float DEFAULT_NM_CONTAINER_QUEUING_LIMIT_STDEV =
   /** Min length of container queue at NodeManager. This is a cluster-wide
    * configuration that acts as the lower-bound of optimal queue length
    * calculated by the NodeQueueLoadMonitor */
   public static final String NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH =
-      YARN_PREFIX + "nm-container-queuing.min-queue-length";
+      RM_PREFIX + "nm-container-queuing.min-queue-length";
   /** Max length of container queue at NodeManager. This is a cluster-wide
    * configuration that acts as the upper-bound of optimal queue length
    * calculated by the NodeQueueLoadMonitor */
   public static final String NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH =
-      YARN_PREFIX + "nm-container-queuing.max-queue-length";
-  public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT = 10;
+      RM_PREFIX + "nm-container-queuing.max-queue-length";
+  public static final int DEFAULT_NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH = 15;
   /** Min queue wait time for a container at a NodeManager. */
   public static final String NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS =
-      YARN_PREFIX + "nm-container-queuing.min-queue-wait-time-ms";
-      1;
+      RM_PREFIX + "nm-container-queuing.min-queue-wait-time-ms";
+      10;
   /** Max queue wait time for a container queue at a NodeManager. */
   public static final String NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS =
-      YARN_PREFIX + "nm-container-queuing.max-queue-wait-time-ms";
-      10;
+      RM_PREFIX + "nm-container-queuing.max-queue-wait-time-ms";
+      100;
    * Enable/disable intermediate-data encryption at YARN level. For now, this
@@ -768,9 +764,14 @@ public class YarnConfiguration extends Configuration {
   /** Max Queue length of <code>OPPORTUNISTIC</code> containers on the NM. */
       NM_PREFIX + "opportunistic-containers-max-queue-length";
+  /** Setting that controls whether distributed scheduling is enabled or not. */
+  public static final String DIST_SCHEDULING_ENABLED =
+      NM_PREFIX + "distributed-scheduling.enabled";
+  public static final boolean DEFAULT_DIST_SCHEDULING_ENABLED = false;
   /** Environment variables that will be sent to containers.*/
   public static final String NM_ADMIN_USER_ENV = NM_PREFIX + "admin-env";
@@ -2744,14 +2745,14 @@ public class YarnConfiguration extends Configuration {
   public static boolean isDistSchedulingEnabled(Configuration conf) {
     return conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED,
-        YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT);
+        YarnConfiguration.DEFAULT_DIST_SCHEDULING_ENABLED);
   public static boolean isOpportunisticContainerAllocationEnabled(
       Configuration conf) {
     return conf.getBoolean(
   /* For debugging. mp configurations to system output as XML format. */

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
index 407aaa2..d69a73c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
@@ -104,7 +104,6 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
     cluster = new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1);
     conf = new YarnConfiguration();
-    conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
     conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 4520eb7..d6ea32b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2693,7 +2693,7 @@
     Setting that controls whether distributed scheduling is enabled.
-    <name>yarn.distributed-scheduling.enabled</name>
+    <name>yarn.nodemanager.distributed-scheduling.enabled</name>
@@ -2702,7 +2702,7 @@
       Setting that controls whether opportunistic container allocation
       is enabled.
-    <name>yarn.opportunistic-container-allocation.enabled</name>
+    <name>yarn.resourcemanager.opportunistic-container-allocation.enabled</name>
@@ -2711,7 +2711,7 @@
     Number of nodes to be used by the Opportunistic Container Allocator for
     dispatching containers during container allocation.
-    <name>yarn.opportunistic-container-allocation.nodes-used</name>
+    <name>yarn.resourcemanager.opportunistic-container-allocation.nodes-used</name>
@@ -2719,7 +2719,7 @@
     Frequency for computing least loaded NMs.
-    <name>yarn.nm-container-queuing.sorting-nodes-interval-ms</name>
+    <name>yarn.resourcemanager.nm-container-queuing.sorting-nodes-interval-ms</name>
@@ -2727,7 +2727,7 @@
     Comparator for determining node load for Distributed Scheduling.
-    <name>yarn.nm-container-queuing.load-comparator</name>
+    <name>yarn.resourcemanager.nm-container-queuing.load-comparator</name>
@@ -2735,7 +2735,7 @@
     Value of standard deviation used for calculation of queue limit thresholds.
-    <name>yarn.nm-container-queuing.queue-limit-stdev</name>
+    <name>yarn.resourcemanager.nm-container-queuing.queue-limit-stdev</name>
@@ -2743,32 +2743,32 @@
     Min length of container queue at NodeManager.
-    <name>yarn.nm-container-queuing.min-queue-length</name>
-    <value>1</value>
+    <name>yarn.resourcemanager.nm-container-queuing.min-queue-length</name>
+    <value>5</value>
     Max length of container queue at NodeManager.
-    <name>yarn.nm-container-queuing.max-queue-length</name>
-    <value>10</value>
+    <name>yarn.resourcemanager.nm-container-queuing.max-queue-length</name>
+    <value>15</value>
     Min queue wait time for a container at a NodeManager.
-    <name>yarn.nm-container-queuing.min-queue-wait-time-ms</name>
-    <value>1</value>
+    <name>yarn.resourcemanager.nm-container-queuing.min-queue-wait-time-ms</name>
+    <value>10</value>
     Max queue wait time for a container queue at a NodeManager.
-    <name>yarn.nm-container-queuing.max-queue-wait-time-ms</name>
-    <value>10</value>
+    <name>yarn.resourcemanager.nm-container-queuing.max-queue-wait-time-ms</name>
+    <value>100</value>

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 6ec3dff..dced31b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -321,7 +321,7 @@ public class NodeManager extends CompositeService
     boolean isDistSchedulingEnabled =
-            YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT);
+            YarnConfiguration.DEFAULT_DIST_SCHEDULING_ENABLED);
     this.context = createNMContext(containerTokenSecretManager,
         nmTokenSecretManager, nmStore, isDistSchedulingEnabled, conf);

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
index 79882aa..dc56090 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
@@ -79,7 +79,7 @@ import com.google.common.base.Preconditions;
  * to intercept and inspect messages from application master to the cluster
  * resource manager. It listens to messages from the application master and
  * creates a request intercepting pipeline instance for each application. The
- * pipeline is a chain of intercepter instances that can inspect and modify the
+ * pipeline is a chain of interceptor instances that can inspect and modify the
  * request/response as needed.
 public class AMRMProxyService extends AbstractService implements

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 74a01e6..9d5246f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -289,7 +289,9 @@ public class ContainerManagerImpl extends CompositeService implements
   protected void createAMRMProxyService(Configuration conf) {
     this.amrmProxyEnabled =
-            YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED);
+            YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED) ||
+            conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED,
+                YarnConfiguration.DEFAULT_DIST_SCHEDULING_ENABLED);
     if (amrmProxyEnabled) {
       LOG.info("AMRMProxyService is enabled. "

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
index 780c1db..99c040e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
@@ -106,7 +106,7 @@ public class ContainerScheduler extends AbstractService implements
     this(context, dispatcher, metrics, context.getConf().getInt(

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
index 2779992..10e5275 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
@@ -112,11 +112,11 @@ public class OpportunisticContainerAllocatorAMService
     this.k = rmContext.getYarnConfiguration().getInt(
     long nodeSortInterval = rmContext.getYarnConfiguration().getLong(
     this.cacheRefreshInterval = nodeSortInterval;
     this.lastCacheUpdateTime = System.currentTimeMillis();
     NodeQueueLoadMonitor.LoadComparator comparator =
@@ -124,14 +124,14 @@ public class OpportunisticContainerAllocatorAMService
     NodeQueueLoadMonitor topKSelector =
         new NodeQueueLoadMonitor(nodeSortInterval, comparator);
     float sigma = rmContext.getYarnConfiguration()
     int limitMin, limitMax;
@@ -139,22 +139,22 @@ public class OpportunisticContainerAllocatorAMService
       limitMin = rmContext.getYarnConfiguration()
       limitMax = rmContext.getYarnConfiguration()
     } else {
       limitMin = rmContext.getYarnConfiguration()
       limitMax = rmContext.getYarnConfiguration()
     topKSelector.initThresholdCalculator(sigma, limitMin, limitMax);

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index a66b093..02d3956 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -947,7 +947,7 @@ public class MockRM extends ResourceManager {
   protected ApplicationMasterService createApplicationMasterService() {
     if (this.rmContext.getYarnConfiguration().getBoolean(
       return new OpportunisticContainerAllocatorAMService(getRMContext(),
           scheduler) {

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
index 0aad30b..b7b956e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
@@ -849,7 +849,9 @@ public class MiniYARNCluster extends CompositeService {
     protected void createAMRMProxyService(Configuration conf) {
       this.amrmProxyEnabled =
-              YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED);
+              YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED) ||
+              conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED,
+                  YarnConfiguration.DEFAULT_DIST_SCHEDULING_ENABLED);
       if (this.amrmProxyEnabled) {
         LOG.info("CustomAMRMProxyService is enabled. "
@@ -879,7 +881,9 @@ public class MiniYARNCluster extends CompositeService {
     protected void createAMRMProxyService(Configuration conf) {
       this.amrmProxyEnabled =
-              YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED);
+              YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED) ||
+              conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED,
+                  YarnConfiguration.DEFAULT_DIST_SCHEDULING_ENABLED);
       if (this.amrmProxyEnabled) {
         LOG.info("CustomAMRMProxyService is enabled. "

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/OpportunisticContainers.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/OpportunisticContainers.md
new file mode 100644
index 0000000..ac26d88
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/OpportunisticContainers.md
@@ -0,0 +1,225 @@
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+   http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+Opportunistic Containers
+* [Purpose](#Purpose)
+* [Quick Guide](#Quick_Guide)
+    * [Main Goal](#Main_Goal)
+    * [Enabling Opportunistic Containers](#Enabling_Opportunistic_Containers)
+    * [Running a Sample Job](Running_a_Sample_Job)
+    * [Opportunistic Containers in Web UI](Opportunistic_Containers_in_Web_UI)
+* [Overview](#Overview)
+* [Container Execution Types](#Container_Execution_Types)
+* [Execution of Opportunistic Containers](#Execution_of_Opportunistic_Containers)
+* [Allocation of Opportunistic Containers](#Allocation_of_Opportunistic_Containers)
+    * [Centralized Allocation](#Centralized_Allocation)
+    * [Distributed Allocation](#Distributed_Allocation)
+    * [Determining Nodes for Allocation](#Determining_Nodes_for_Allocation)
+    * [Rebalancing Node Load](#Rebalancing_Node_Load)
+* [Advanced Configuration](#Advanced_Configuration)
+* [Items for Future Work](#Items_for_Future_Work)
+<a name="Purpose"></a>Purpose
+This document introduces the notion of **opportunistic** container execution, and discusses how opportunistic containers are allocated and executed.
+<a name="Quick_Guide"></a>Quick Guide
+We start by providing a brief overview of opportunistic containers, including how a user can enable this feature and run a sample job using such containers.
+###<a name="Main_Goal"></a>Main Goal
+Unlike existing YARN containers that are scheduled in a node only if there are unallocated resources, opportunistic containers can be dispatched to an NM, even if their execution at that node cannot start immediately. In such a case, opportunistic containers will be queued at that NM until resources become available. 
+The main goal of opportunistic container execution is to improve cluster resource utilization, and therefore increase task throughput. Resource utilization and task throughput improvements are more pronounced for workloads that include relatively short tasks (in the order of seconds).
+###<a name="Enabling_Opportunistic_Containers"></a>Enabling Opportunistic Containers
+To enable opportunistic container allocation, the following two properties have to be present in **conf/yarn-site.xml**:
+| Property | Description | Default value |
+|:-------- |:----- |:----- |
+| `yarn.resourcemanager.opportunistic-container-allocation.enabled` | Enables opportunistic container allocation. | `false` |
+| `yarn.nodemanager.opportunistic-containers-max-queue-length` | Determines the max number of opportunistic containers that can be queued at an NM. | `0` |
+The first parameter above has to be set to `true`. The second one has to be set to a positive value to allow queuing of opportunistic containers at the NM. A value of `10` can be used to start experimenting with opportunistic containers. The optimal value depends on the jobs characteristics, the cluster configuration and the target utilization.
+By default, allocation of opportunistic containers is performed centrally through the RM. However, a user can choose to enable distributed allocation of opportunistic containers, which can further improve allocation latency for short tasks. Distributed scheduling can be enabling by setting to `true` the following parameter (note that non-opportunistic containers will continue being scheduled through the RM):
+| Property | Description | Default value |
+|:-------- |:----- |:----- |
+| `yarn.nodemanager.distributed-scheduling.enabled` | Enables distributed scheduling. | `false` |
+###<a name="Running_a_Sample_Job"></a>Running a Sample Job
+The following command can be used to run a sample pi map-reduce job, executing 40% of mappers using opportunistic containers (substitute `3.0.0-alpha2-SNAPSHOT` below with the version of Hadoop you are using):
+$ hadoop jar hadoop-3.0.0-alpha2-SNAPSHOT/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.0.0-alpha2-SNAPSHOT.jar pi -Dmapreduce.job.num-opportunistic-maps-percentage="40" 50 100
+By changing the value of `mapreduce.job.num-opportunistic-maps-percentage` in the above command, we can specify the percentage of mappers that can be executed through opportunistic containers.
+###<a name="Opportunistic_Containers_in_Web_UI"></a>Opportunistic Containers in Web UI
+When opportunistic container allocation is enabled, the following new columns can be observed in the Nodes page of the Web UI (`rm-address:8088/cluster/nodes`):
+* Running Containers (O): number of running opportunistic containers on each node;
+* Mem Used (O): Total memory used by opportunistic containers on each node;
+* VCores Used (O): Total CPU virtual cores used by opportunistic containers on each node;
+* Queued Containers: Number of containers queued at each node.
+When clicking on a specific container running on a node, the execution type of the container is also shown.
+In the rest of the document, we provide an in-depth description of opportunistic containers, including details about their allocation and execution.
+Overview <a name="Overview"></a>
+The existing schedulers in YARN (Fair and Capacity Scheduler) allocate containers to a node only if there are unallocated resources at that node at the moment of scheduling the containers. This **guaranteed** type of execution has the advantage that once the AM dispatches a container to a node, the container execution will start immediately, since it is guaranteed that there will be available resources. Moreover, unless fairness or capacity constraints are violated, containers are guaranteed to run to completion without being preempted. 
+Although this design offers a more predictable task execution, it has two main drawbacks that can lead to suboptimal cluster resource utilization:
+* **Feedback delays.** When a container finishes its execution at a node, the RM gets notified that there are available resources through the next NM-RM heartbeat, then the RM schedules a new container at that node, the AM gets notified through the next AM-RM heartbeat, and finally the AM launches the new container at the node. These delays result in idle node resources, which in turn lead to lower resource utilization, especially when workloads involve tasks whose duration is relatively short.
+* **Allocated vs. utilized resources.** The RM allocates containers based on the *allocated* resources at each node, which might be significantly higher than the actually *utilized* resources (e.g., think of a container for which 4GB memory have been allocated, but only 2GB are being utilized). This lowers effective resource utilization, and can be avoided if the RM takes into account the utilized resources during scheduling. However, this has to be done in a way that allows resources to be reclaimed in case the utilized resources of a running container increase.
+To mitigate the above problems, in addition to the existing containers (which we term **guaranteed** containers hereafter), we introduce the notion of **opportunistic** containers. An opportunistic container can be dispatched to an NM, even if there are no available (unallocated) resources for it at the moment of scheduling. In such a case, the opportunistic container will be queued at the NM, waiting for resources to become available for its execution to start. The opportunistic containers are of lower priority than the guaranteed ones, which means that they can be preempted for guaranteed containers to start their execution. Therefore, they can be used to improve cluster resource utilization without impacting the execution of existing guaranteed containers.
+An additional advantage of opportunistic containers is that they introduce a notion of **execution priority at the NMs**. For instance, a lower priority job that does not require strict execution guarantees can use opportunistic containers or a mix of container execution types for its tasks.
+We have introduced two ways of allocating opportunistic containers: a **centralized** and a **distributed** one. In the centralized scheduling, opportunistic containers are allocated through the YARN RM, whereas in the distributed one, through local schedulers that reside at each NM. Centralized allocation allows for higher quality placement decisions and for implementing more involved sharing policies across applications (e.g., fairness). On the other hand, distributed scheduling can offer faster container allocation, which is useful for short tasks, as it avoids the round-trip to the RM. In both cases, the scheduling of guaranteed containers remains intact and happens through the YARN RM (using the existing Fair or Capacity Scheduler).
+Note that in the current implementation, we are allocating containers based on allocated (and not utilized) resources. Therefore, we tackle the "feedback delays" problem mentioned above, but not the "allocated vs. utilized resources" one. There is ongoing work (`YARN-1011`) that employs opportunistic containers to address the latter problem too.
+Below, we describe in more detail the [container execution types](#Container_Execution_Types), as well as the [execution](#Execution_of_Opportunistic_Containers) (including the container queuing at the NMs) and [allocation](#Allocation_of_Opportunistic_Containers) of opportunistic containers. Then we discuss how to fine-tune opportunistic containers through some [advanced configuration parameters](#Advanced_Configuration). Finally, we discuss open items for [future work](#Items_for_Future_Work).
+<a name="Container_Execution_Types"></a>Container Execution Types
+We introduce the following two types of containers:
+* **Guaranteed containers** correspond to the existing YARN containers. They are allocated by the Fair or Capacity Scheduler, and once dispatched to a node, it is guaranteed that there are available resources for their execution to start immediately. Moreover, these containers run to completion (as long as there are no failures). They can be preempted only in case the scheduler's queue to which they belong, violates fairness or capacity constraints.
+* **Opportunistic containers** are not guaranteed to have resources for their execution to start when they get dispatched to a node. Instead, they might be queued at the NM until resources become available. In case a guaranteed container arrives at a node and there are no resources available for it, one or more opportunistic containers will be preempted to execute the guaranteed one.
+When an AM submits its resource requests to the RM, it specifies the type for each container (default is guaranteed), determining the way the container will be [allocated](#Allocation_of_Opportunistic_Containers). Subsequently, when the container is launched by the AM at an NM, its type determines how it will be [executed](#Execution_of_Opportunistic_Containers) by the NM.
+<a name="Execution_of_Opportunistic_Containers"></a>Execution of Opportunistic Containers
+When a container arrives at an NM, its execution is determined by the available resources at the NM and the container type. Guaranteed containers start their execution immediately, and if needed, the NM will kill running opportunistic containers to ensure there are sufficient resources for the guaranteed ones to start. On the other hand, opportunistic containers can be queued at the NM, if there are no resources available to start their execution when they arrive at the NM. To enable this, we extended the NM by allowing queuing of containers at each node. The NM monitors the local resources, and when there are sufficient resources available, it starts the execution of the opportunistic container that is at the head of the queue.
+In particular, when a container arrives at an NM, localization is performed (i.e., all required resources are downloaded), and then the container moves to a `SCHEDULED` state, in which the container is queued, waiting for its execution to begin:
+* If there are available resources, the execution of the container starts immediately, irrespective of its execution type.
+* If there are no available resources:
+    * If the container is guaranteed, we kill as many running opportunistic containers as required for the guaranteed container to be executed, and then start its execution.
+    * If the container is opportunistic, it remains at the queue until resources become available.
+* When a container (guaranteed or opportunistic) finishes its execution and resources get freed up, we examine the queued containers and if there are available resources we start their execution. We pick containers from the queue in a FIFO order.
+In the [future work items](#Items_for_Future_Work) below, we discuss different ways of prioritizing task execution (queue reordering) and of killing opportunistic containers to make space for guaranteed ones.
+<a name="Allocation_of_Opportunistic_Containers"></a>Allocation of Opportunistic Containers
+As mentioned above, we provide both a centralized and a distributed way of allocating opportunistic containers, which we describe below.
+###<a name="Centralized_Allocation"></a>Centralized Allocation
+We have introduced a new service at the RM, namely the `OpportunisticContainerAllocatorAMService`, which extends the `ApplicationMasterService`. When the centralized opportunistic allocation is enabled, the resource requests from the AMs are served at the RM side by the `OpportunisticContainerAllocatorAMService`, which splits them into two sets of resource requests: 
+* The guaranteed set is forwarded to the existing `ApplicationMasterService` and is subsequently handled by the Fair or Capacity Scheduler.
+* The opportunistic set is handled by the new `OpportunisticContainerAllocator`, which performs the scheduling of opportunistic containers to nodes.
+The `OpportunisticContainerAllocator` maintains a list with the [least loaded nodes](#Determining_Nodes_for_Allocation) of the cluster at each moment, and assigns containers to them in a round-robin fashion. Note that in the current implementation, we purposely do not take into account node locality constraints. Since an opportunistic container (unlike the guaranteed ones) might wait at the queue of an NM before its execution starts, it is more important to allocate it at a node that is less loaded (i.e., where queuing delay will be smaller) rather than respect its locality constraints. Moreover, we do not take into account sharing (fairness/capacity) constraints for opportunistic containers at the moment. Support for both locality and sharing constraints can be added in the future if required.
+###<a name="Distributed_Allocation"></a>Distributed Allocation
+In order to enable distributed scheduling of opportunistic containers, we have introduced a new service at each NM, called `AMRMProxyService`. The `AMRMProxyService` implements the `ApplicationMasterService` protocol, and acts as a proxy between the AMs running at that node and the RM. When the `AMRMProxyService` is enabled (through a parameter), we force all AMs running at a particular node to communicate with the `AMRMProxyService` of the same node, instead of going directly to the RM. Moreover, to ensure that the AMs will not talk directly with the RM, when a new AM gets initialized, we replace its `AMRMToken` with a token signed by the `AMRMProxyService`.
+A chain of interceptors can be registered with the `AMRMProxyService`. One of these interceptors is the `DistributedScheduler` that is responsible for allocating opportunistic containers in a distributed way, without needing to contact the RM. This modular design makes the `AMRMProxyService` instrumental in other scenarios too, such as YARN federation (`YARN-2915`) or throttling down misbehaving AMs, which can be enabled simply by adding additional interceptors at the interceptor chain.
+When distributed opportunistic scheduling is enabled, each AM sends its resource requests to the `AMRMProxyService` running at the same node. The `AMRMProxyService` splits the resource requests into two sets:
+* The guaranteed set is forwarded to the RM. In this case the `AMRMProxyService` simply acts as a proxy between the AM and the RM, and the container allocation remains intact (using the Fair or Capacity Scheduler).
+* The opportunistic set is not forwarded to the RM. Instead, it is handled by the `DistributedScheduler` that is running locally at the node. In particular, the `DistributedScheduler` maintains a list with the least loaded nodes in the cluster, and allocates containers to them in a round-robin fashion. The RM informs the `DistributedScheduler` about the least loaded nodes at regular intervals through the NM-RM heartbeats.
+The above procedure is similar to the one performed by the `OpportunisticContainerAllocatorAMService` in the case of centralized opportunistic scheduling described above. The main difference is that in the distributed case, the splitting of requests into guaranteed and opportunistic happens locally at the node, and only the guaranteed requests are forwarded to the RM, while the opportunistic ones are handled without contacting the RM.
+###<a name="Determining_Nodes_for_Allocation"></a>Determining Nodes for Allocation
+Each NM informs the RM periodically through the NM-RM heartbeats about the number of running guaranteed and opportunistic containers, as well as the number of queued opportunistic containers. The RM gathers this information from all nodes and determines the least loaded ones.
+In the case of centralized allocation of opportunistic containers, this information is immediately available, since the allocation happens centrally. In the case of distributed scheduling, the list with the least loaded nodes is propagated to all NMs (and thus becomes available to the `DistributedSchedulers`) through the heartbeat responses from the RM to the NMs. The number of least loaded nodes sent to the NMs is configurable.
+At the moment, we take into account only the number of queued opportunistic containers at each node in order to estimate the time an opportunistic container would have to wait if sent to that node and, thus, determine the least loaded nodes. If the AM provided us with information about the estimated task durations, we could take them into account in order to have better estimates of the queue waiting times.
+###<a name="Rebalancing_Node_Load"></a>Rebalancing Node Load
+Occasionally poor placement choices for opportunistic containers may be made (due to stale queue length estimates), which can lead to load imbalance between nodes. The problem is more pronounced under high cluster load, and also in the case of distributed scheduling (multiple `DistributedSchedulers` may place containers at the same NM, since they do not coordinate with each other). To deal with this load imbalance between the NM queues, we perform load shedding to dynamically re-balance the load between NMs. In particular, while aggregating at the RM the queue time estimates published by each NM, we construct a distribution and find a targeted maximal value for the length of the NM queues (based on the mean and standard deviation of the distribution). Then the RM disseminates this value to the various NMs through the heartbeat responses. Subsequently, using this information, an NM on a node whose queue length is above the threshold discards opportunistic containers to meet this maxi
 mal value. This forces the associated individual AMs to reschedule those containers elsewhere.
+<a name="Advanced_Configuration"></a>Advanced Configuration
+The main properties for enabling opportunistic container allocation and choosing between centralized and distributed allocation were described in the [quick guide](#Quick_Guide) in the beginning of this document. Here we present more advanced configuration. Note that using default values for those parameters should be sufficient in most cases. All parameters below have to be defined in the **conf/yarn-site.xml** file.
+To determine the number of [least loaded nodes](#Determining_Nodes_for_Allocation) that will be used when scheduling opportunistic containers and how often this list will be refreshed, we use the following parameters:
+| Property | Description | Default value |
+|:-------- |:----- |:----- |
+| `yarn.resourcemanager.opportunistic-container-allocation.nodes-used` | Number of least loaded nodes to be used by the Opportunistic Container allocator for dispatching containers during container allocation. A higher value can improve load balance in large clusters. | `10` |
+| `yarn.resourcemanager.nm-container-queuing.sorting-nodes-interval-ms` | Frequency for computing least loaded nodes. | `1000` |
+As discussed in the [node load rebalancing](#Rebalancing_Node_Load) section above, at regular intervals, the RM gathers all NM queue lengths and computes their mean value (`avg`) and standard deviation (`stdev`), as well as the value `avg + k*stdev` (where `k` a float). This value gets propagated through the NM-RM heartbeats to all NMs, who should respect that value by dequeuing containers (if required), as long as their current queue length is between a `queue_min_length` and a `queue_max_length` value (these values are used to avoid dequeuing tasks from very short queues and to aggressively dequeue tasks from long queues, respectively). 
+The parameters `k`, `queue_min_length` and `queue_max_length` can be specified as follows:
+| Property | Description | Default value |
+|:-------- |:----- |:----- |
+| `yarn.resourcemanager.nm-container-queuing.queue-limit-stdev` | The `k` parameter. | `1.0f` |
+| `yarn.resourcemanager.nm-container-queuing.min-queue-length` | The `queue_min_length` parameter. | `5` |
+| `yarn.resourcemanager.nm-container-queuing.max-queue-length` | The `queue_max_length` parameter. | `15` |
+Finally, two more properties can further tune the `AMRMProxyService` in case distributed scheduling is used:
+| Property | Description | Default value |
+|:-------- |:----- |:----- |
+| `yarn.nodemanager.amrmproxy.address` | The address/port to which the `AMRMProxyService` is bound to. | `` |
+| `yarn.nodemanager.amrmproxy.client.thread-count` | The number of threads that are used at each NM for serving the interceptors register to the `AMRMProxyService` by different jobs. | `3` |
+<a name="Items_for_Future_Work"></a>Items for Future Work
+Here we describe multiple ways in which we can extend/enhance the allocation and execution of opportunistic containers. We also provide the JIRAs that track each item.
+* **Resource overcommitment** (`YARN-1011`). As already discussed, in order to further improve the cluster resource utilization, we can schedule containers not based on the allocated resources but on the actually utilized ones. When over-committing resources, there is the risk of running out of resources in case we have an increase in the utilized resources of the already running containers. Therefore, opportunistic execution should be used for containers whose allocation goes beyond the capacity of a node. This way, we can choose opportunistic containers to kill for reclaiming resources.
+* **NM Queue reordering** (`YARN-5886`). Instead of executing queued containers in a FIFO order, we can employ reordering strategies that dynamically determine which opportunistic container will be executed next. For example, we can prioritize containers that are expected to be short-running or which belong to applications that are close to completion.
+* **Out of order killing at NMs** (`YARN-5887`). As described above, when we need to free up resources for a guaranteed container to start its execution, we kill opportunistic containers in reverse order of arrival (first the most recently started ones). This might not always be the right decision. For example, we might want to minimize the number of containers killed or to refrain from killing containers of jobs that are very close to completion.
+* **Container pausing** (`YARN-5292`): At the moment we kill opportunistic containers to make room for guaranteed in case of resource contention. In busy clusters this can lower the effective cluster utilization: whenever we kill a running opportunistic container, it has to be restarted, and thus we lose work. To this end, we can instead pause running opportunistic containers. Note that this will require support from the container executor (e.g., the container technology used) and from the application.
+* **Container promotion** (`YARN-5085`). There are cases where changing the execution type of a container during its execution can be beneficial. For instance, an application might submit a container as opportunistic, and when its execution starts, it can request its promotion to a guaranteed container to avoid it getting killed.

To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org

View raw message