hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asur...@apache.org
Subject [2/2] hadoop git commit: YARN-5457. Refactor DistributedScheduling framework to pull out common functionality. (asuresh)
Date Tue, 09 Aug 2016 07:49:31 GMT
YARN-5457. Refactor DistributedScheduling framework to pull out common functionality. (asuresh)

(cherry picked from commit 82c9e061017c32e633e0b0cbb7978749a6df4fb2)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5f7edb79
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5f7edb79
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5f7edb79

Branch: refs/heads/branch-2
Commit: 5f7edb79d155df57c0b5cc8d8d2b0f908dee8ee9
Parents: 427b540
Author: Arun Suresh <asuresh@apache.org>
Authored: Fri Aug 5 11:13:05 2016 -0700
Committer: Arun Suresh <asuresh@apache.org>
Committed: Tue Aug 9 00:46:08 2016 -0700

----------------------------------------------------------------------
 .../hadoop/mapred/TestMROpportunisticMaps.java  |   2 +
 .../hadoop/yarn/conf/YarnConfiguration.java     | 107 +++---
 .../api/impl/TestDistributedScheduling.java     |   4 +-
 .../src/main/resources/yarn-default.xml         |  50 +--
 .../OpportunisticContainerAllocator.java        | 378 +++++++++++++++++++
 .../OpportunisticContainerContext.java          | 178 +++++++++
 .../yarn/server/scheduler/package-info.java     |  22 ++
 .../hadoop/yarn/server/nodemanager/Context.java |   2 +-
 .../yarn/server/nodemanager/NodeManager.java    |   9 +-
 .../amrmproxy/DefaultRequestInterceptor.java    |  67 +++-
 .../scheduler/DistributedScheduler.java         | 264 +++----------
 .../OpportunisticContainerAllocator.java        | 190 ----------
 .../amrmproxy/BaseAMRMProxyTest.java            |   2 +-
 .../scheduler/TestDistributedScheduler.java     |  10 +-
 .../DistributedSchedulingAMService.java         | 361 ------------------
 ...pportunisticContainerAllocatorAMService.java | 367 ++++++++++++++++++
 .../server/resourcemanager/ResourceManager.java |  32 +-
 .../yarn/server/resourcemanager/MockRM.java     |   7 +-
 .../TestDistributedSchedulingAMService.java     | 269 -------------
 ...pportunisticContainerAllocatorAMService.java | 271 +++++++++++++
 20 files changed, 1445 insertions(+), 1147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5f7edb79/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java
index dfe85f2..021863b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java
@@ -91,6 +91,8 @@ public class TestMROpportunisticMaps {
       Configuration conf = new Configuration();
       // Start the mini-MR and mini-DFS clusters
       conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
+      conf.setBoolean(YarnConfiguration.
+          OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
       conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
       conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
       dfsCluster = new MiniDFSCluster.Builder(conf)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5f7edb79/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 8aa0419..c341022 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -301,55 +301,60 @@ public class YarnConfiguration extends Configuration {
       YARN_PREFIX + "distributed-scheduling.enabled";
   public static final boolean DIST_SCHEDULING_ENABLED_DEFAULT = false;
 
-  /** Minimum memory (in MB) used for allocating a container through distributed
-   * scheduling. */
-  public static final String DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB =
-      YARN_PREFIX + "distributed-scheduling.min-container-memory-mb";
-  public static final int DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB_DEFAULT = 512;
-
-  /** Minimum virtual CPU cores used for allocating a container through
-   * distributed scheduling. */
-  public static final String DIST_SCHEDULING_MIN_CONTAINER_VCORES =
-      YARN_PREFIX + "distributed-scheduling.min-container-vcores";
-  public static final int DIST_SCHEDULING_MIN_CONTAINER_VCORES_DEFAULT = 1;
-
-  /** Maximum memory (in MB) used for allocating a container through distributed
-   * scheduling. */
-  public static final String DIST_SCHEDULING_MAX_MEMORY_MB =
-      YARN_PREFIX + "distributed-scheduling.max-container-memory-mb";
-  public static final int DIST_SCHEDULING_MAX_MEMORY_MB_DEFAULT = 2048;
-
-  /** Maximum virtual CPU cores used for allocating a container through
-   * distributed scheduling. */
-  public static final String DIST_SCHEDULING_MAX_CONTAINER_VCORES =
-      YARN_PREFIX + "distributed-scheduling.max-container-vcores";
-  public static final int DIST_SCHEDULING_MAX_CONTAINER_VCORES_DEFAULT = 4;
-
-  /** Incremental memory (in MB) used for allocating a container through
-   * distributed scheduling. */
-  public static final String DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB =
-      YARN_PREFIX + "distributed-scheduling.incr-container-memory-mb";
-  public static final int DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB_DEFAULT =
+  /** Setting that controls whether opportunistic container allocation
+   *  is enabled or not. */
+  public static final String OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED =
+      YARN_PREFIX + "opportunistic-container-allocation.enabled";
+  public static final boolean
+      OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED_DEFAULT = false;
+
+  /** Minimum memory (in MB) used for allocating an opportunistic container. */
+  public static final String OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB =
+      YARN_PREFIX + "opportunistic-containers.min-memory-mb";
+  public static final int OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB_DEFAULT = 512;
+
+  /** Minimum virtual CPU cores used for allocating an opportunistic container.
+   * */
+  public static final String OPPORTUNISTIC_CONTAINERS_MIN_VCORES =
+      YARN_PREFIX + "opportunistic-containers.min-vcores";
+  public static final int OPPORTUNISTIC_CONTAINERS_MIN_VCORES_DEFAULT = 1;
+
+  /** Maximum memory (in MB) used for allocating an opportunistic container. */
+  public static final String OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB =
+      YARN_PREFIX + "opportunistic-containers.max-memory-mb";
+  public static final int OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB_DEFAULT = 2048;
+
+  /** Maximum virtual CPU cores used for allocating an opportunistic container.
+   * */
+  public static final String OPPORTUNISTIC_CONTAINERS_MAX_VCORES =
+      YARN_PREFIX + "opportunistic-containers.max-vcores";
+  public static final int OPPORTUNISTIC_CONTAINERS_MAX_VCORES_DEFAULT = 4;
+
+  /** Incremental memory (in MB) used for allocating an opportunistic container.
+   * */
+  public static final String OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB =
+      YARN_PREFIX + "opportunistic-containers.incr-memory-mb";
+  public static final int OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB_DEFAULT =
       512;
 
-  /** Incremental virtual CPU cores used for allocating a container through
-   * distributed scheduling. */
-  public static final String DIST_SCHEDULING_INCR_CONTAINER_VCORES =
-      YARN_PREFIX + "distributed-scheduling.incr-vcores";
-  public static final int DIST_SCHEDULING_INCR_CONTAINER_VCORES_DEFAULT = 1;
-
-  /** Container token expiry for container allocated via distributed
-   * scheduling. */
-  public static final String DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS =
-      YARN_PREFIX + "distributed-scheduling.container-token-expiry-ms";
-  public static final int DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT =
+  /** Incremental virtual CPU cores used for allocating an opportunistic
+   * container. */
+  public static final String OPPORTUNISTIC_CONTAINERS_INCR_VCORES =
+      YARN_PREFIX + "opportunistic-containers.incr-vcores";
+  public static final int OPPORTUNISTIC_CONTAINERS_INCR_VCORES_DEFAULT = 1;
+
+  /** Container token expiry for opportunistic containers. */
+  public static final String OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS =
+      YARN_PREFIX + "opportunistic-containers.container-token-expiry-ms";
+  public static final int OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS_DEFAULT =
       600000;
 
-  /** Number of nodes to be used by the LocalScheduler of a NodeManager for
-   * dispatching containers during distributed scheduling. */
-  public static final String DIST_SCHEDULING_NODES_NUMBER_USED =
-      YARN_PREFIX + "distributed-scheduling.nodes-used";
-  public static final int DIST_SCHEDULING_NODES_NUMBER_USED_DEFAULT = 10;
+  /** Number of nodes to be used by the Opportunistic Container allocator for
+   * dispatching containers during container allocation. */
+  public static final String OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED =
+      YARN_PREFIX + "opportunistic-container-allocation.nodes-used";
+  public static final int OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED_DEFAULT =
+      10;
 
   /** Frequency for computing least loaded NMs. */
   public static final String NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS =
@@ -2733,6 +2738,18 @@ public class YarnConfiguration extends Configuration {
     return clusterId;
   }
 
+  public static boolean isDistSchedulingEnabled(Configuration conf) {
+    return conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED,
+        YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT);
+  }
+
+  public static boolean isOpportunisticContainerAllocationEnabled(
+      Configuration conf) {
+    return conf.getBoolean(
+        YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED,
+        YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED_DEFAULT);
+  }
+
   /* For debugging. mp configurations to system output as XML format. */
   public static void main(String[] args) throws Exception {
     new YarnConfiguration(new Configuration()).writeXml(System.out);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5f7edb79/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
index 7245bc6..11da7ba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
@@ -84,7 +84,7 @@ import static org.mockito.Mockito.when;
  * specifying OPPORTUNISTIC containers in its resource requests,
  * the AMRMProxyService on the NM, the DistributedScheduler RequestInterceptor
  * on the NM and the DistributedSchedulingProtocol used by the framework to talk
- * to the DistributedSchedulingAMService running on the RM.
+ * to the OpportunisticContainerAllocatorAMService running on the RM.
  */
 public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
 
@@ -105,6 +105,8 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
 
     conf = new YarnConfiguration();
     conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.
+        OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
     conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
     conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
     cluster.init(conf);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5f7edb79/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 9b16d05..2a8bd2d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2678,72 +2678,76 @@
 
   <property>
     <description>
-    Minimum memory (in MB) used for allocating a container through distributed
-    scheduling.
+      Setting that controls whether opportunistic container allocation
+      is enabled.
     </description>
-    <name>yarn.distributed-scheduling.min-container-memory-mb</name>
+    <name>yarn.opportunistic-container-allocation.enabled</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <description>
+      Minimum memory (in MB) used for allocating an opportunistic container.
+    </description>
+    <name>yarn.opportunistic-containers.min-memory-mb</name>
     <value>512</value>
   </property>
 
   <property>
     <description>
-    Minimum virtual CPU cores used for allocating a container through
-    distributed scheduling.
+      Minimum virtual CPU cores used for allocating an opportunistic container.
     </description>
-    <name>yarn.distributed-scheduling.min-container-vcores</name>
+    <name>yarn.opportunistic-containers.min-vcores</name>
     <value>1</value>
   </property>
 
   <property>
     <description>
-    Maximum memory (in MB) used for allocating a container through distributed
-    scheduling.
+    Maximum memory (in MB) used for allocating an opportunistic container.
     </description>
-    <name>yarn.distributed-scheduling.max-container-memory-mb</name>
+    <name>yarn.opportunistic-containers.max-memory-mb</name>
     <value>2048</value>
   </property>
 
   <property>
     <description>
-    Maximum virtual CPU cores used for allocating a container through
-    distributed scheduling.
+    Maximum virtual CPU cores used for allocating an opportunistic container.
     </description>
-    <name>yarn.distributed-scheduling.max-container-vcores</name>
+    <name>yarn.opportunistic-containers.max-vcores</name>
     <value>4</value>
   </property>
 
   <property>
     <description>
-    Incremental memory (in MB) used for allocating a container through
-    distributed scheduling.
+    Incremental memory (in MB) used for allocating an opportunistic container.
     </description>
-    <name>yarn.distributed-scheduling.incr-container-memory-mb</name>
+    <name>yarn.opportunistic-containers.incr-memory-mb</name>
     <value>512</value>
   </property>
 
   <property>
     <description>
-    Incremental virtual CPU cores used for allocating a container through
-    distributed scheduling.
+    Incremental virtual CPU cores used for allocating an opportunistic
+      container.
     </description>
-    <name>yarn.distributed-scheduling.incr-vcores</name>
+    <name>yarn.opportunistic-containers.incr-vcores</name>
     <value>1</value>
   </property>
 
   <property>
     <description>
-    Container token expiry for container allocated via distributed scheduling.
+    Container token expiry for opportunistic containers.
     </description>
-    <name>yarn.distributed-scheduling.container-token-expiry-ms</name>
+    <name>yarn.opportunistic-containers.container-token-expiry-ms</name>
     <value>600000</value>
   </property>
 
   <property>
     <description>
-    Number of nodes to be used by the LocalScheduler of a NodeManager for
-    dispatching containers during distributed scheduling.
+    Number of nodes to be used by the Opportunistic Container Allocator for
+    dispatching containers during container allocation.
     </description>
-    <name>yarn.distributed-scheduling.nodes-used</name>
+    <name>yarn.opportunistic-container-allocation.nodes-used</name>
     <value>10</value>
   </property>
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5f7edb79/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
new file mode 100644
index 0000000..41b5d56
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.scheduler;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.ContainerType;
+
+import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * <p>
+ * The OpportunisticContainerAllocator allocates containers on a given list of
+ * nodes, after modifying the container sizes to respect the limits set by the
+ * ResourceManager. It tries to distribute the containers as evenly as possible.
+ * </p>
+ */
+public class OpportunisticContainerAllocator {
+
+  /**
+   * This class encapsulates application specific parameters used to build a
+   * Container.
+   */
+  public static class AllocationParams {
+    private Resource maxResource;
+    private Resource minResource;
+    private Resource incrementResource;
+    private int containerTokenExpiryInterval;
+
+    /**
+     * Return Max Resource.
+     * @return Resource
+     */
+    public Resource getMaxResource() {
+      return maxResource;
+    }
+
+    /**
+     * Set Max Resource.
+     * @param maxResource Resource
+     */
+    public void setMaxResource(Resource maxResource) {
+      this.maxResource = maxResource;
+    }
+
+    /**
+     * Get Min Resource.
+     * @return Resource
+     */
+    public Resource getMinResource() {
+      return minResource;
+    }
+
+    /**
+     * Set Min Resource.
+     * @param minResource Resource
+     */
+    public void setMinResource(Resource minResource) {
+      this.minResource = minResource;
+    }
+
+    /**
+     * Get Incremental Resource.
+     * @return Incremental Resource
+     */
+    public Resource getIncrementResource() {
+      return incrementResource;
+    }
+
+    /**
+     * Set Incremental resource.
+     * @param incrementResource Resource
+     */
+    public void setIncrementResource(Resource incrementResource) {
+      this.incrementResource = incrementResource;
+    }
+
+    /**
+     * Get Container Token Expiry interval.
+     * @return Container Token Expiry interval
+     */
+    public int getContainerTokenExpiryInterval() {
+      return containerTokenExpiryInterval;
+    }
+
+    /**
+     * Set Container Token Expiry time in ms.
+     * @param containerTokenExpiryInterval Container Token Expiry in ms
+     */
+    public void setContainerTokenExpiryInterval(
+        int containerTokenExpiryInterval) {
+      this.containerTokenExpiryInterval = containerTokenExpiryInterval;
+    }
+  }
+
+  /**
+   * A Container Id Generator.
+   */
+  public static class ContainerIdGenerator {
+
+    protected volatile AtomicLong containerIdCounter = new AtomicLong(1);
+
+    /**
+     * This method can reset the generator to a specific value.
+     * @param containerIdStart containerId
+     */
+    public void resetContainerIdCounter(long containerIdStart) {
+      this.containerIdCounter.set(containerIdStart);
+    }
+
+    /**
+     * Sets the underlying Atomic Long. To be used when implementation needs to
+     * share the underlying AtomicLong of an existing counter.
+     * @param counter AtomicLong
+     */
+    public void setContainerIdCounter(AtomicLong counter) {
+      this.containerIdCounter = counter;
+    }
+
+    /**
+     * Generates a new long value. Default implementation increments the
+     * underlying AtomicLong. Sub classes are encouraged to over-ride this
+     * behaviour.
+     * @return Counter.
+     */
+    public long generateContainerId() {
+      return this.containerIdCounter.incrementAndGet();
+    }
+  }
+
+  static class PartitionedResourceRequests {
+    private List<ResourceRequest> guaranteed = new ArrayList<>();
+    private List<ResourceRequest> opportunistic = new ArrayList<>();
+    public List<ResourceRequest> getGuaranteed() {
+      return guaranteed;
+    }
+    public List<ResourceRequest> getOpportunistic() {
+      return opportunistic;
+    }
+  }
+
+  private static final Log LOG =
+      LogFactory.getLog(OpportunisticContainerAllocator.class);
+
+  private static final ResourceCalculator RESOURCE_CALCULATOR =
+      new DominantResourceCalculator();
+
+  private final BaseContainerTokenSecretManager tokenSecretManager;
+  private int webpagePort;
+
+  /**
+   * Create a new Opportunistic Container Allocator.
+   * @param tokenSecretManager TokenSecretManager
+   * @param webpagePort Webpage Port
+   */
+  public OpportunisticContainerAllocator(
+      BaseContainerTokenSecretManager tokenSecretManager, int webpagePort) {
+    this.tokenSecretManager = tokenSecretManager;
+    this.webpagePort = webpagePort;
+  }
+
+  /**
+   * Entry point into the Opportunistic Container Allocator.
+   * @param request AllocateRequest
+   * @param applicationAttemptId ApplicationAttemptId
+   * @param appContext App Specific OpportunisticContainerContext
+   * @param rmIdentifier RM Identifier
+   * @param appSubmitter App Submitter
+   * @return List of Containers.
+   * @throws YarnException YarnException
+   */
+  public List<Container> allocateContainers(
+      AllocateRequest request, ApplicationAttemptId applicationAttemptId,
+      OpportunisticContainerContext appContext, long rmIdentifier,
+      String appSubmitter) throws YarnException {
+    // Partition requests into GUARANTEED and OPPORTUNISTIC reqs
+    PartitionedResourceRequests partitionedAsks =
+        partitionAskList(request.getAskList());
+
+    List<ContainerId> releasedContainers = request.getReleaseList();
+    int numReleasedContainers = releasedContainers.size();
+    if (numReleasedContainers > 0) {
+      LOG.info("AttemptID: " + applicationAttemptId + " released: "
+          + numReleasedContainers);
+      appContext.getContainersAllocated().removeAll(releasedContainers);
+    }
+
+    // Also, update black list
+    ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest();
+    if (rbr != null) {
+      appContext.getBlacklist().removeAll(rbr.getBlacklistRemovals());
+      appContext.getBlacklist().addAll(rbr.getBlacklistAdditions());
+    }
+
+    // Add OPPORTUNISTIC reqs to the outstanding reqs
+    appContext.addToOutstandingReqs(partitionedAsks.getOpportunistic());
+
+    List<Container> allocatedContainers = new ArrayList<>();
+    for (Priority priority :
+        appContext.getOutstandingOpReqs().descendingKeySet()) {
+      // Allocated containers :
+      //  Key = Requested Capability,
+      //  Value = List of Containers of given Cap (The actual container size
+      //          might be different than what is requested.. which is why
+      //          we need the requested capability (key) to match against
+      //          the outstanding reqs)
+      Map<Resource, List<Container>> allocated = allocate(rmIdentifier,
+          appContext, priority, applicationAttemptId, appSubmitter);
+      for (Map.Entry<Resource, List<Container>> e : allocated.entrySet()) {
+        appContext.matchAllocationToOutstandingRequest(
+            e.getKey(), e.getValue());
+        allocatedContainers.addAll(e.getValue());
+      }
+    }
+
+    // Send all the GUARANTEED Reqs to RM
+    request.setAskList(partitionedAsks.getGuaranteed());
+    return allocatedContainers;
+  }
+
+  private Map<Resource, List<Container>> allocate(long rmIdentifier,
+      OpportunisticContainerContext appContext, Priority priority,
+      ApplicationAttemptId appAttId, String userName) throws YarnException {
+    Map<Resource, List<Container>> containers = new HashMap<>();
+    for (ResourceRequest anyAsk :
+        appContext.getOutstandingOpReqs().get(priority).values()) {
+      allocateContainersInternal(rmIdentifier, appContext.getAppParams(),
+          appContext.getContainerIdGenerator(), appContext.getBlacklist(),
+          appAttId, appContext.getNodeMap(), userName, containers, anyAsk);
+      LOG.info("Opportunistic allocation requested for ["
+          + "priority=" + anyAsk.getPriority()
+          + ", num_containers=" + anyAsk.getNumContainers()
+          + ", capability=" + anyAsk.getCapability() + "]"
+          + " allocated = " + containers.get(anyAsk.getCapability()).size());
+    }
+    return containers;
+  }
+
+  private void allocateContainersInternal(long rmIdentifier,
+      AllocationParams appParams, ContainerIdGenerator idCounter,
+      Set<String> blacklist, ApplicationAttemptId id,
+      Map<String, NodeId> allNodes, String userName,
+      Map<Resource, List<Container>> containers, ResourceRequest anyAsk)
+      throws YarnException {
+    int toAllocate = anyAsk.getNumContainers()
+        - (containers.isEmpty() ? 0 :
+            containers.get(anyAsk.getCapability()).size());
+
+    List<NodeId> nodesForScheduling = new ArrayList<>();
+    for (Entry<String, NodeId> nodeEntry : allNodes.entrySet()) {
+      // Do not use blacklisted nodes for scheduling.
+      if (blacklist.contains(nodeEntry.getKey())) {
+        continue;
+      }
+      nodesForScheduling.add(nodeEntry.getValue());
+    }
+    int numAllocated = 0;
+    int nextNodeToSchedule = 0;
+    for (int numCont = 0; numCont < toAllocate; numCont++) {
+      nextNodeToSchedule++;
+      nextNodeToSchedule %= nodesForScheduling.size();
+      NodeId nodeId = nodesForScheduling.get(nextNodeToSchedule);
+      Container container = buildContainer(rmIdentifier, appParams, idCounter,
+          anyAsk, id, userName, nodeId);
+      List<Container> cList = containers.get(anyAsk.getCapability());
+      if (cList == null) {
+        cList = new ArrayList<>();
+        containers.put(anyAsk.getCapability(), cList);
+      }
+      cList.add(container);
+      numAllocated++;
+      LOG.info("Allocated [" + container.getId() + "] as opportunistic.");
+    }
+    LOG.info("Allocated " + numAllocated + " opportunistic containers.");
+  }
+
+  private Container buildContainer(long rmIdentifier,
+      AllocationParams appParams, ContainerIdGenerator idCounter,
+      ResourceRequest rr, ApplicationAttemptId id, String userName,
+      NodeId nodeId) throws YarnException {
+    ContainerId cId =
+        ContainerId.newContainerId(id, idCounter.generateContainerId());
+
+    // Normalize the resource asks (Similar to what the the RM scheduler does
+    // before accepting an ask)
+    Resource capability = normalizeCapability(appParams, rr);
+
+    long currTime = System.currentTimeMillis();
+    ContainerTokenIdentifier containerTokenIdentifier =
+        new ContainerTokenIdentifier(
+            cId, nodeId.getHost() + ":" + nodeId.getPort(), userName,
+            capability, currTime + appParams.containerTokenExpiryInterval,
+            tokenSecretManager.getCurrentKey().getKeyId(), rmIdentifier,
+            rr.getPriority(), currTime,
+            null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK,
+            ExecutionType.OPPORTUNISTIC);
+    byte[] pwd =
+        tokenSecretManager.createPassword(containerTokenIdentifier);
+    Token containerToken = newContainerToken(nodeId, pwd,
+        containerTokenIdentifier);
+    Container container = BuilderUtils.newContainer(
+        cId, nodeId, nodeId.getHost() + ":" + webpagePort,
+        capability, rr.getPriority(), containerToken,
+        containerTokenIdentifier.getExecutionType(),
+        rr.getAllocationRequestId());
+    return container;
+  }
+
+  private Resource normalizeCapability(AllocationParams appParams,
+      ResourceRequest ask) {
+    return Resources.normalize(RESOURCE_CALCULATOR,
+        ask.getCapability(), appParams.minResource, appParams.maxResource,
+        appParams.incrementResource);
+  }
+
+  private static Token newContainerToken(NodeId nodeId, byte[] password,
+      ContainerTokenIdentifier tokenIdentifier) {
+    // RPC layer client expects ip:port as service for tokens
+    InetSocketAddress addr = NetUtils.createSocketAddrForHost(nodeId.getHost(),
+        nodeId.getPort());
+    // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token
+    Token containerToken = Token.newInstance(tokenIdentifier.getBytes(),
+        ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil
+            .buildTokenService(addr).toString());
+    return containerToken;
+  }
+
+  private PartitionedResourceRequests partitionAskList(List<ResourceRequest>
+      askList) {
+    PartitionedResourceRequests partitionedRequests =
+        new PartitionedResourceRequests();
+    for (ResourceRequest rr : askList) {
+      if (rr.getExecutionTypeRequest().getExecutionType() ==
+          ExecutionType.OPPORTUNISTIC) {
+        partitionedRequests.getOpportunistic().add(rr);
+      } else {
+        partitionedRequests.getGuaranteed().add(rr);
+      }
+    }
+    return partitionedRequests;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5f7edb79/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
new file mode 100644
index 0000000..1b701ea
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.scheduler;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.AllocationParams;
+import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.ContainerIdGenerator;
+
+/**
+ * This encapsulates application specific information used by the
+ * Opportunistic Container Allocator to allocate containers.
+ */
+public class OpportunisticContainerContext {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(OpportunisticContainerContext.class);
+
+  // Currently just used to keep track of allocated containers.
+  // Can be used for reporting stats later.
+  private Set<ContainerId> containersAllocated = new HashSet<>();
+  private AllocationParams appParams =
+      new AllocationParams();
+  private ContainerIdGenerator containerIdGenerator =
+      new ContainerIdGenerator();
+
+  private Map<String, NodeId> nodeMap = new LinkedHashMap<>();
+
+  // Mapping of NodeId to NodeTokens. Populated either from RM response or
+  // generated locally if required.
+  private Map<NodeId, NMToken> nodeTokens = new HashMap<>();
+  private final Set<String> blacklist = new HashSet<>();
+
+  // This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority,
+  // Resource Name (Host/rack/any) and capability. This mapping is required
+  // to match a received Container to an outstanding OPPORTUNISTIC
+  // ResourceRequest (ask).
+  private final TreeMap<Priority, Map<Resource, ResourceRequest>>
+      outstandingOpReqs = new TreeMap<>();
+
+  public Set<ContainerId> getContainersAllocated() {
+    return containersAllocated;
+  }
+
+  public OpportunisticContainerAllocator.AllocationParams getAppParams() {
+    return appParams;
+  }
+
+  public ContainerIdGenerator getContainerIdGenerator() {
+    return containerIdGenerator;
+  }
+
+  public void setContainerIdGenerator(
+      ContainerIdGenerator containerIdGenerator) {
+    this.containerIdGenerator = containerIdGenerator;
+  }
+
+  public Map<String, NodeId> getNodeMap() {
+    return nodeMap;
+  }
+
+  public Map<NodeId, NMToken> getNodeTokens() {
+    return nodeTokens;
+  }
+
+  public Set<String> getBlacklist() {
+    return blacklist;
+  }
+
+  public TreeMap<Priority, Map<Resource, ResourceRequest>>
+      getOutstandingOpReqs() {
+    return outstandingOpReqs;
+  }
+
+  /**
+   * Takes a list of ResourceRequests (asks), extracts the key information viz.
+   * (Priority, ResourceName, Capability) and adds to the outstanding
+   * OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce
+   * the current YARN constraint that only a single ResourceRequest can exist at
+   * a give Priority and Capability.
+   *
+   * @param resourceAsks the list with the {@link ResourceRequest}s
+   */
+  public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) {
+    for (ResourceRequest request : resourceAsks) {
+      Priority priority = request.getPriority();
+
+      // TODO: Extend for Node/Rack locality. We only handle ANY requests now
+      if (!ResourceRequest.isAnyLocation(request.getResourceName())) {
+        continue;
+      }
+
+      if (request.getNumContainers() == 0) {
+        continue;
+      }
+
+      Map<Resource, ResourceRequest> reqMap =
+          outstandingOpReqs.get(priority);
+      if (reqMap == null) {
+        reqMap = new HashMap<>();
+        outstandingOpReqs.put(priority, reqMap);
+      }
+
+      ResourceRequest resourceRequest = reqMap.get(request.getCapability());
+      if (resourceRequest == null) {
+        resourceRequest = request;
+        reqMap.put(request.getCapability(), request);
+      } else {
+        resourceRequest.setNumContainers(
+            resourceRequest.getNumContainers() + request.getNumContainers());
+      }
+      if (ResourceRequest.isAnyLocation(request.getResourceName())) {
+        LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority
+            + ", with capability = " + request.getCapability() + " ) : "
+            + resourceRequest.getNumContainers());
+      }
+    }
+  }
+
+  /**
+   * This method matches a returned list of Container Allocations to any
+   * outstanding OPPORTUNISTIC ResourceRequest.
+   * @param capability Capability
+   * @param allocatedContainers Allocated Containers
+   */
+  public void matchAllocationToOutstandingRequest(Resource capability,
+      List<Container> allocatedContainers) {
+    for (Container c : allocatedContainers) {
+      containersAllocated.add(c.getId());
+      Map<Resource, ResourceRequest> asks =
+          outstandingOpReqs.get(c.getPriority());
+
+      if (asks == null) {
+        continue;
+      }
+
+      ResourceRequest rr = asks.get(capability);
+      if (rr != null) {
+        rr.setNumContainers(rr.getNumContainers() - 1);
+        if (rr.getNumContainers() == 0) {
+          asks.remove(capability);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5f7edb79/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/package-info.java
new file mode 100644
index 0000000..dd56829
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utility classes used for Scheduling.
+ */
+package org.apache.hadoop.yarn.server.scheduler;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5f7edb79/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index cfcf1bd..8ef5899 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
-import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator;
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5f7edb79/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index f7d226e..84c6eeb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -71,7 +71,7 @@ import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ScriptBasedNodeLabel
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
-import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator;
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer;
@@ -327,7 +327,8 @@ public class NodeManager extends CompositeService
     addService(nodeHealthChecker);
 
     boolean isDistSchedulingEnabled =
-        conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED,
+        conf.getBoolean(YarnConfiguration.
+            OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED,
             YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT);
 
     this.context = createNMContext(containerTokenSecretManager,
@@ -361,8 +362,8 @@ public class NodeManager extends CompositeService
     ((NMContext) context).setWebServer(webServer);
 
     ((NMContext) context).setQueueableContainerAllocator(
-        new OpportunisticContainerAllocator(nodeStatusUpdater, context,
-            webServer.getPort()));
+        new OpportunisticContainerAllocator(
+            context.getContainerTokenSecretManager(), webServer.getPort()));
 
     dispatcher.register(ContainerManagerEventType.class, containerManager);
     dispatcher.register(NodeManagerEventType.class, this);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5f7edb79/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
index 75fe022..efbdfb4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
@@ -62,7 +62,7 @@ public final class DefaultRequestInterceptor extends
     AbstractRequestInterceptor {
   private static final Logger LOG = LoggerFactory
       .getLogger(DefaultRequestInterceptor.class);
-  private DistributedSchedulingAMProtocol rmClient;
+  private ApplicationMasterProtocol rmClient;
   private UserGroupInformation user = null;
 
   @Override
@@ -76,15 +76,7 @@ public final class DefaultRequestInterceptor extends
       user.addToken(appContext.getAMRMToken());
       final Configuration conf = this.getConf();
 
-      rmClient = user.doAs(
-          new PrivilegedExceptionAction<DistributedSchedulingAMProtocol>() {
-            @Override
-            public DistributedSchedulingAMProtocol run() throws Exception {
-              setAMRMTokenService(conf);
-              return ServerRMProxy.createRMProxy(conf,
-                  DistributedSchedulingAMProtocol.class);
-            }
-          });
+      rmClient = createRMClient(appContext, conf);
     } catch (IOException e) {
       String message =
           "Error while creating of RM app master service proxy for attemptId:"
@@ -100,6 +92,32 @@ public final class DefaultRequestInterceptor extends
     }
   }
 
+  private ApplicationMasterProtocol createRMClient(
+      AMRMProxyApplicationContext appContext, final Configuration conf)
+      throws IOException, InterruptedException {
+    if (appContext.getNMCotext().isDistributedSchedulingEnabled()) {
+      return user.doAs(
+          new PrivilegedExceptionAction<DistributedSchedulingAMProtocol>() {
+            @Override
+            public DistributedSchedulingAMProtocol run() throws Exception {
+              setAMRMTokenService(conf);
+              return ServerRMProxy.createRMProxy(conf,
+                  DistributedSchedulingAMProtocol.class);
+            }
+          });
+    } else {
+      return user.doAs(
+          new PrivilegedExceptionAction<ApplicationMasterProtocol>() {
+            @Override
+            public ApplicationMasterProtocol run() throws Exception {
+              setAMRMTokenService(conf);
+              return ClientRMProxy.createRMProxy(conf,
+                  ApplicationMasterProtocol.class);
+            }
+          });
+    }
+  }
+
   @Override
   public RegisterApplicationMasterResponse registerApplicationMaster(
       final RegisterApplicationMasterRequest request)
@@ -127,9 +145,15 @@ public final class DefaultRequestInterceptor extends
   registerApplicationMasterForDistributedScheduling
       (RegisterApplicationMasterRequest request) throws YarnException,
       IOException {
-    LOG.info("Forwarding registerApplicationMasterForDistributedScheduling" +
-        "request to the real YARN RM");
-    return rmClient.registerApplicationMasterForDistributedScheduling(request);
+    if (getApplicationContext().getNMCotext()
+        .isDistributedSchedulingEnabled()) {
+      LOG.info("Forwarding registerApplicationMasterForDistributedScheduling" +
+          "request to the real YARN RM");
+      return ((DistributedSchedulingAMProtocol)rmClient)
+          .registerApplicationMasterForDistributedScheduling(request);
+    } else {
+      throw new YarnException("Distributed Scheduling is not enabled !!");
+    }
   }
 
   @Override
@@ -140,13 +164,18 @@ public final class DefaultRequestInterceptor extends
       LOG.debug("Forwarding allocateForDistributedScheduling request" +
           "to the real YARN RM");
     }
-    DistributedSchedulingAllocateResponse allocateResponse =
-        rmClient.allocateForDistributedScheduling(request);
-    if (allocateResponse.getAllocateResponse().getAMRMToken() != null) {
-      updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken());
+    if (getApplicationContext().getNMCotext()
+        .isDistributedSchedulingEnabled()) {
+      DistributedSchedulingAllocateResponse allocateResponse =
+          ((DistributedSchedulingAMProtocol)rmClient)
+              .allocateForDistributedScheduling(request);
+      if (allocateResponse.getAllocateResponse().getAMRMToken() != null) {
+        updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken());
+      }
+      return allocateResponse;
+    } else {
+      throw new YarnException("Distributed Scheduling is not enabled !!");
     }
-
-    return allocateResponse;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5f7edb79/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
index bfb12ee..368858c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
@@ -32,34 +32,23 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyApplicationContext;
 import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AbstractRequestInterceptor;
-
-
-
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
 
 /**
  * <p>The DistributedScheduler runs on the NodeManager and is modeled as an
@@ -76,74 +65,49 @@ import java.util.TreeMap;
  */
 public final class DistributedScheduler extends AbstractRequestInterceptor {
 
-  static class PartitionedResourceRequests {
-    private List<ResourceRequest> guaranteed = new ArrayList<>();
-    private List<ResourceRequest> opportunistic = new ArrayList<>();
-    public List<ResourceRequest> getGuaranteed() {
-      return guaranteed;
-    }
-    public List<ResourceRequest> getOpportunistic() {
-      return opportunistic;
-    }
-  }
-
-  static class DistributedSchedulerParams {
-    Resource maxResource;
-    Resource minResource;
-    Resource incrementResource;
-    int containerTokenExpiryInterval;
-  }
-
   private static final Logger LOG = LoggerFactory
       .getLogger(DistributedScheduler.class);
 
   private final static RecordFactory RECORD_FACTORY =
       RecordFactoryProvider.getRecordFactory(null);
 
-  // Currently just used to keep track of allocated containers.
-  // Can be used for reporting stats later.
-  private Set<ContainerId> containersAllocated = new HashSet<>();
-
-  private DistributedSchedulerParams appParams =
-      new DistributedSchedulerParams();
-  private final OpportunisticContainerAllocator.ContainerIdCounter
-      containerIdCounter =
-          new OpportunisticContainerAllocator.ContainerIdCounter();
-  private Map<String, NodeId> nodeList = new LinkedHashMap<>();
-
-  // Mapping of NodeId to NodeTokens. Populated either from RM response or
-  // generated locally if required.
-  private Map<NodeId, NMToken> nodeTokens = new HashMap<>();
-  final Set<String> blacklist = new HashSet<>();
-
-  // This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority,
-  // Resource Name (Host/rack/any) and capability. This mapping is required
-  // to match a received Container to an outstanding OPPORTUNISTIC
-  // ResourceRequest (ask).
-  final TreeMap<Priority, Map<Resource, ResourceRequest>>
-      outstandingOpReqs = new TreeMap<>();
+  private OpportunisticContainerContext oppContainerContext =
+      new OpportunisticContainerContext();
 
   private ApplicationAttemptId applicationAttemptId;
   private OpportunisticContainerAllocator containerAllocator;
   private NMTokenSecretManagerInNM nmSecretManager;
   private String appSubmitter;
-
-  public void init(AMRMProxyApplicationContext appContext) {
-    super.init(appContext);
-    initLocal(appContext.getApplicationAttemptId(),
-        appContext.getNMCotext().getContainerAllocator(),
-        appContext.getNMCotext().getNMTokenSecretManager(),
-        appContext.getUser());
+  private long rmIdentifier;
+
+  public void init(AMRMProxyApplicationContext applicationContext) {
+    super.init(applicationContext);
+    initLocal(applicationContext.getNMCotext().getNodeStatusUpdater()
+        .getRMIdentifier(),
+        applicationContext.getApplicationAttemptId(),
+        applicationContext.getNMCotext().getContainerAllocator(),
+        applicationContext.getNMCotext().getNMTokenSecretManager(),
+        applicationContext.getUser());
   }
 
   @VisibleForTesting
-  void initLocal(ApplicationAttemptId applicationAttemptId,
-      OpportunisticContainerAllocator containerAllocator,
+  void initLocal(long rmId, ApplicationAttemptId appAttemptId,
+      OpportunisticContainerAllocator oppContainerAllocator,
       NMTokenSecretManagerInNM nmSecretManager, String appSubmitter) {
-    this.applicationAttemptId = applicationAttemptId;
-    this.containerAllocator = containerAllocator;
+    this.rmIdentifier = rmId;
+    this.applicationAttemptId = appAttemptId;
+    this.containerAllocator = oppContainerAllocator;
     this.nmSecretManager = nmSecretManager;
     this.appSubmitter = appSubmitter;
+
+    // Overrides the Generator to decrement container id.
+    this.oppContainerContext.setContainerIdGenerator(
+        new OpportunisticContainerAllocator.ContainerIdGenerator() {
+          @Override
+          public long generateContainerId() {
+            return this.containerIdCounter.decrementAndGet();
+          }
+        });
   }
 
   /**
@@ -202,7 +166,8 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
     if (allocatedContainers.size() > 0) {
       response.getAllocatedContainers().addAll(allocatedContainers);
       for (Container alloc : allocatedContainers) {
-        if (!nodeTokens.containsKey(alloc.getNodeId())) {
+        if (!oppContainerContext.getNodeTokens().containsKey(
+            alloc.getNodeId())) {
           newTokens.add(nmSecretManager.generateNMToken(appSubmitter, alloc));
         }
       }
@@ -212,115 +177,34 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
     }
   }
 
-  private PartitionedResourceRequests partitionAskList(List<ResourceRequest>
-      askList) {
-    PartitionedResourceRequests partitionedRequests =
-        new PartitionedResourceRequests();
-    for (ResourceRequest rr : askList) {
-      if (rr.getExecutionTypeRequest().getExecutionType() ==
-          ExecutionType.OPPORTUNISTIC) {
-        partitionedRequests.getOpportunistic().add(rr);
-      } else {
-        partitionedRequests.getGuaranteed().add(rr);
-      }
-    }
-    return partitionedRequests;
-  }
-
   private void updateParameters(
       RegisterDistributedSchedulingAMResponse registerResponse) {
-    appParams.minResource = registerResponse.getMinContainerResource();
-    appParams.maxResource = registerResponse.getMaxContainerResource();
-    appParams.incrementResource =
-        registerResponse.getIncrContainerResource();
-    if (appParams.incrementResource == null) {
-      appParams.incrementResource = appParams.minResource;
+    oppContainerContext.getAppParams().setMinResource(
+        registerResponse.getMinContainerResource());
+    oppContainerContext.getAppParams().setMaxResource(
+        registerResponse.getMaxContainerResource());
+    oppContainerContext.getAppParams().setIncrementResource(
+        registerResponse.getIncrContainerResource());
+    if (oppContainerContext.getAppParams().getIncrementResource() == null) {
+      oppContainerContext.getAppParams().setIncrementResource(
+          oppContainerContext.getAppParams().getMinResource());
     }
-    appParams.containerTokenExpiryInterval = registerResponse
-        .getContainerTokenExpiryInterval();
+    oppContainerContext.getAppParams().setContainerTokenExpiryInterval(
+        registerResponse.getContainerTokenExpiryInterval());
 
-    containerIdCounter
+    oppContainerContext.getContainerIdGenerator()
         .resetContainerIdCounter(registerResponse.getContainerIdStart());
     setNodeList(registerResponse.getNodesForScheduling());
   }
 
-  /**
-   * Takes a list of ResourceRequests (asks), extracts the key information viz.
-   * (Priority, ResourceName, Capability) and adds to the outstanding
-   * OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce
-   * the current YARN constraint that only a single ResourceRequest can exist at
-   * a give Priority and Capability.
-   *
-   * @param resourceAsks the list with the {@link ResourceRequest}s
-   */
-  public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) {
-    for (ResourceRequest request : resourceAsks) {
-      Priority priority = request.getPriority();
-
-      // TODO: Extend for Node/Rack locality. We only handle ANY requests now
-      if (!ResourceRequest.isAnyLocation(request.getResourceName())) {
-        continue;
-      }
-
-      if (request.getNumContainers() == 0) {
-        continue;
-      }
-
-      Map<Resource, ResourceRequest> reqMap =
-          this.outstandingOpReqs.get(priority);
-      if (reqMap == null) {
-        reqMap = new HashMap<>();
-        this.outstandingOpReqs.put(priority, reqMap);
-      }
-
-      ResourceRequest resourceRequest = reqMap.get(request.getCapability());
-      if (resourceRequest == null) {
-        resourceRequest = request;
-        reqMap.put(request.getCapability(), request);
-      } else {
-        resourceRequest.setNumContainers(
-            resourceRequest.getNumContainers() + request.getNumContainers());
-      }
-      if (ResourceRequest.isAnyLocation(request.getResourceName())) {
-        LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority
-            + ", with capability = " + request.getCapability() + " ) : "
-            + resourceRequest.getNumContainers());
-      }
-    }
-  }
-
-  /**
-   * This method matches a returned list of Container Allocations to any
-   * outstanding OPPORTUNISTIC ResourceRequest.
-   */
-  private void matchAllocationToOutstandingRequest(Resource capability,
-      List<Container> allocatedContainers) {
-    for (Container c : allocatedContainers) {
-      containersAllocated.add(c.getId());
-      Map<Resource, ResourceRequest> asks =
-          outstandingOpReqs.get(c.getPriority());
-
-      if (asks == null)
-        continue;
-
-      ResourceRequest rr = asks.get(capability);
-      if (rr != null) {
-        rr.setNumContainers(rr.getNumContainers() - 1);
-        if (rr.getNumContainers() == 0) {
-          asks.remove(capability);
-        }
-      }
-    }
-  }
-
   private void setNodeList(List<NodeId> nodeList) {
-    this.nodeList.clear();
+    oppContainerContext.getNodeMap().clear();
     addToNodeList(nodeList);
   }
 
   private void addToNodeList(List<NodeId> nodes) {
     for (NodeId n : nodes) {
-      this.nodeList.put(n.getHost(), n);
+      oppContainerContext.getNodeMap().put(n.getHost(), n);
     }
   }
 
@@ -345,52 +229,13 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
       LOG.debug("Forwarding allocate request to the" +
           "Distributed Scheduler Service on YARN RM");
     }
-    // Partition requests into GUARANTEED and OPPORTUNISTIC reqs
-    PartitionedResourceRequests partitionedAsks =
-        partitionAskList(request.getAllocateRequest().getAskList());
-
-    List<ContainerId> releasedContainers =
-        request.getAllocateRequest().getReleaseList();
-    int numReleasedContainers = releasedContainers.size();
-    if (numReleasedContainers > 0) {
-      LOG.info("AttemptID: " + applicationAttemptId + " released: "
-          + numReleasedContainers);
-      containersAllocated.removeAll(releasedContainers);
-    }
-
-    // Also, update black list
-    ResourceBlacklistRequest rbr =
-        request.getAllocateRequest().getResourceBlacklistRequest();
-    if (rbr != null) {
-      blacklist.removeAll(rbr.getBlacklistRemovals());
-      blacklist.addAll(rbr.getBlacklistAdditions());
-    }
-
-    // Add OPPORTUNISTIC reqs to the outstanding reqs
-    addToOutstandingReqs(partitionedAsks.getOpportunistic());
-
-    List<Container> allocatedContainers = new ArrayList<>();
-    for (Priority priority : outstandingOpReqs.descendingKeySet()) {
-      // Allocated containers :
-      //  Key = Requested Capability,
-      //  Value = List of Containers of given Cap (The actual container size
-      //          might be different than what is requested.. which is why
-      //          we need the requested capability (key) to match against
-      //          the outstanding reqs)
-      Map<Resource, List<Container>> allocated =
-          containerAllocator.allocate(this.appParams, containerIdCounter,
-              outstandingOpReqs.get(priority).values(), blacklist,
-              applicationAttemptId, nodeList, appSubmitter);
-      for (Map.Entry<Resource, List<Container>> e : allocated.entrySet()) {
-        matchAllocationToOutstandingRequest(e.getKey(), e.getValue());
-        allocatedContainers.addAll(e.getValue());
-      }
-    }
+    List<Container> allocatedContainers =
+        containerAllocator.allocateContainers(
+            request.getAllocateRequest(), applicationAttemptId,
+            oppContainerContext, rmIdentifier, appSubmitter);
 
     request.setAllocatedContainers(allocatedContainers);
 
-    // Send all the GUARANTEED Reqs to RM
-    request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed());
     DistributedSchedulingAllocateResponse dsResp =
         getNextInterceptor().allocateForDistributedScheduling(request);
 
@@ -398,7 +243,7 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
     setNodeList(dsResp.getNodesForScheduling());
     List<NMToken> nmTokens = dsResp.getAllocateResponse().getNMTokens();
     for (NMToken nmToken : nmTokens) {
-      nodeTokens.put(nmToken.getNodeId(), nmToken);
+      oppContainerContext.getNodeTokens().put(nmToken.getNodeId(), nmToken);
     }
 
     List<ContainerStatus> completedContainers =
@@ -407,7 +252,8 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
     // Only account for opportunistic containers
     for (ContainerStatus cs : completedContainers) {
       if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
-        containersAllocated.remove(cs.getContainerId());
+        oppContainerContext.getContainersAllocated()
+            .remove(cs.getContainerId());
       }
     }
 
@@ -417,9 +263,9 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
         dsResp.getAllocateResponse(), nmTokens, allocatedContainers);
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug(
-          "Number of opportunistic containers currently allocated by" +
-              "application: " + containersAllocated.size());
+      LOG.debug("Number of opportunistic containers currently" +
+          "allocated by application: " + oppContainerContext
+          .getContainersAllocated().size());
     }
     return dsResp;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5f7edb79/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
deleted file mode 100644
index 4723233..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.scheduler;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.server.api.ContainerType;
-import org.apache.hadoop.yarn.server.nodemanager.Context;
-import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
-import org.apache.hadoop.yarn.server.nodemanager.scheduler.DistributedScheduler.DistributedSchedulerParams;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.Resources;
-
-import java.net.InetSocketAddress;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * <p>
- * The OpportunisticContainerAllocator allocates containers on a given list of
- * nodes, after modifying the container sizes to respect the limits set by the
- * ResourceManager. It tries to distribute the containers as evenly as possible.
- * It also uses the <code>NMTokenSecretManagerInNM</code> to generate the
- * required NM tokens for the allocated containers.
- * </p>
- */
-public class OpportunisticContainerAllocator {
-
-  private static final Log LOG =
-      LogFactory.getLog(OpportunisticContainerAllocator.class);
-
-  private static final ResourceCalculator RESOURCE_CALCULATOR =
-      new DominantResourceCalculator();
-
-  static class ContainerIdCounter {
-    final AtomicLong containerIdCounter = new AtomicLong(1);
-
-    void resetContainerIdCounter(long containerIdStart) {
-      this.containerIdCounter.set(containerIdStart);
-    }
-
-    long generateContainerId() {
-      return this.containerIdCounter.decrementAndGet();
-    }
-  }
-
-  private final NodeStatusUpdater nodeStatusUpdater;
-  private final Context context;
-  private int webpagePort;
-
-  public OpportunisticContainerAllocator(NodeStatusUpdater nodeStatusUpdater,
-      Context context, int webpagePort) {
-    this.nodeStatusUpdater = nodeStatusUpdater;
-    this.context = context;
-    this.webpagePort = webpagePort;
-  }
-
-  public Map<Resource, List<Container>> allocate(
-      DistributedSchedulerParams appParams, ContainerIdCounter idCounter,
-      Collection<ResourceRequest> resourceAsks, Set<String> blacklist,
-      ApplicationAttemptId appAttId, Map<String, NodeId> allNodes,
-      String userName) throws YarnException {
-    Map<Resource, List<Container>> containers = new HashMap<>();
-    for (ResourceRequest anyAsk : resourceAsks) {
-      allocateOpportunisticContainers(appParams, idCounter, blacklist, appAttId,
-          allNodes, userName, containers, anyAsk);
-      LOG.info("Opportunistic allocation requested for ["
-          + "priority=" + anyAsk.getPriority()
-          + ", num_containers=" + anyAsk.getNumContainers()
-          + ", capability=" + anyAsk.getCapability() + "]"
-          + " allocated = " + containers.get(anyAsk.getCapability()).size());
-    }
-    return containers;
-  }
-
-  private void allocateOpportunisticContainers(
-      DistributedSchedulerParams appParams, ContainerIdCounter idCounter,
-      Set<String> blacklist, ApplicationAttemptId id,
-      Map<String, NodeId> allNodes, String userName,
-      Map<Resource, List<Container>> containers, ResourceRequest anyAsk)
-      throws YarnException {
-    int toAllocate = anyAsk.getNumContainers()
-        - (containers.isEmpty() ? 0 :
-            containers.get(anyAsk.getCapability()).size());
-
-    List<NodeId> nodesForScheduling = new ArrayList<>();
-    for (Entry<String, NodeId> nodeEntry : allNodes.entrySet()) {
-      // Do not use blacklisted nodes for scheduling.
-      if (blacklist.contains(nodeEntry.getKey())) {
-        continue;
-      }
-      nodesForScheduling.add(nodeEntry.getValue());
-    }
-    int numAllocated = 0;
-    int nextNodeToSchedule = 0;
-    for (int numCont = 0; numCont < toAllocate; numCont++) {
-      nextNodeToSchedule++;
-      nextNodeToSchedule %= nodesForScheduling.size();
-      NodeId nodeId = nodesForScheduling.get(nextNodeToSchedule);
-      Container container = buildContainer(appParams, idCounter, anyAsk, id,
-          userName, nodeId);
-      List<Container> cList = containers.get(anyAsk.getCapability());
-      if (cList == null) {
-        cList = new ArrayList<>();
-        containers.put(anyAsk.getCapability(), cList);
-      }
-      cList.add(container);
-      numAllocated++;
-      LOG.info("Allocated [" + container.getId() + "] as opportunistic.");
-    }
-    LOG.info("Allocated " + numAllocated + " opportunistic containers.");
-  }
-
-  private Container buildContainer(DistributedSchedulerParams appParams,
-      ContainerIdCounter idCounter, ResourceRequest rr, ApplicationAttemptId id,
-      String userName, NodeId nodeId) throws YarnException {
-    ContainerId cId =
-        ContainerId.newContainerId(id, idCounter.generateContainerId());
-
-    // Normalize the resource asks (Similar to what the the RM scheduler does
-    // before accepting an ask)
-    Resource capability = normalizeCapability(appParams, rr);
-
-    long currTime = System.currentTimeMillis();
-    ContainerTokenIdentifier containerTokenIdentifier =
-        new ContainerTokenIdentifier(
-            cId, nodeId.getHost() + ":" + nodeId.getPort(), userName,
-            capability, currTime + appParams.containerTokenExpiryInterval,
-            context.getContainerTokenSecretManager().getCurrentKey().getKeyId(),
-            nodeStatusUpdater.getRMIdentifier(), rr.getPriority(), currTime,
-            null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK,
-            ExecutionType.OPPORTUNISTIC);
-    byte[] pwd =
-        context.getContainerTokenSecretManager().createPassword(
-            containerTokenIdentifier);
-    Token containerToken = newContainerToken(nodeId, pwd,
-        containerTokenIdentifier);
-    Container container = BuilderUtils.newContainer(
-        cId, nodeId, nodeId.getHost() + ":" + webpagePort,
-        capability, rr.getPriority(), containerToken,
-        containerTokenIdentifier.getExecutionType(),
-        rr.getAllocationRequestId());
-    return container;
-  }
-
-  private Resource normalizeCapability(DistributedSchedulerParams appParams,
-      ResourceRequest ask) {
-    return Resources.normalize(RESOURCE_CALCULATOR,
-        ask.getCapability(), appParams.minResource, appParams.maxResource,
-        appParams.incrementResource);
-  }
-
-  public static Token newContainerToken(NodeId nodeId, byte[] password,
-      ContainerTokenIdentifier tokenIdentifier) {
-    // RPC layer client expects ip:port as service for tokens
-    InetSocketAddress addr = NetUtils.createSocketAddrForHost(nodeId.getHost(),
-        nodeId.getPort());
-    // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token
-    Token containerToken = Token.newInstance(tokenIdentifier.getBytes(),
-        ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil
-            .buildTokenService(addr).toString());
-    return containerToken;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5f7edb79/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
index 4f726d4..a41f865 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -67,7 +67,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
-import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator;
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5f7edb79/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
index b093b3b..8f1ae7f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
@@ -38,11 +38,12 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAl
 import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
-import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
 import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
@@ -189,7 +190,6 @@ public class TestDistributedScheduler {
       DistributedScheduler distributedScheduler) {
     NodeStatusUpdater nodeStatusUpdater = Mockito.mock(NodeStatusUpdater.class);
     Mockito.when(nodeStatusUpdater.getRMIdentifier()).thenReturn(12345l);
-    Context context = Mockito.mock(Context.class);
     NMContainerTokenSecretManager nmContainerTokenSecretManager = new
         NMContainerTokenSecretManager(conf);
     MasterKey mKey = new MasterKey() {
@@ -207,15 +207,13 @@ public class TestDistributedScheduler {
       public void setBytes(ByteBuffer bytes) {}
     };
     nmContainerTokenSecretManager.setMasterKey(mKey);
-    Mockito.when(context.getContainerTokenSecretManager()).thenReturn
-        (nmContainerTokenSecretManager);
     OpportunisticContainerAllocator containerAllocator =
-        new OpportunisticContainerAllocator(nodeStatusUpdater, context, 7777);
+        new OpportunisticContainerAllocator(nmContainerTokenSecretManager, 77);
 
     NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
         new NMTokenSecretManagerInNM();
     nmTokenSecretManagerInNM.setMasterKey(mKey);
-    distributedScheduler.initLocal(
+    distributedScheduler.initLocal(1234,
         ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1),
         containerAllocator, nmTokenSecretManagerInNM, "test");
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message