hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asur...@apache.org
Subject [2/3] hadoop git commit: YARN-2885. Create AMRMProxy request interceptor and ContainerAllocator to distribute OPPORTUNISTIC containers to appropriate Nodes (asuresh)
Date Mon, 25 Apr 2016 06:02:05 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c282a08f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
new file mode 100644
index 0000000..42c1dcd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.scheduler;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords
+    .FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords
+    .FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords
+    .RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords
+    .RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy
+    .AMRMProxyApplicationContext;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AbstractRequestInterceptor;
+
+
+
+import org.apache.hadoop.yarn.server.nodemanager.security
+    .NMTokenSecretManagerInNM;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * <p>The LocalScheduler runs on the NodeManager and is modelled as an
+ * <code>AMRMProxy</code> request interceptor. It is responsible for the
+ * following :</p>
+ * <ul>
+ *   <li>Intercept <code>ApplicationMasterProtocol</code> calls and unwrap the
+ *   response objects to extract instructions from the
+ *   <code>ClusterManager</code> running on the ResourceManager to aid in making
+ *   Scheduling scheduling decisions</li>
+ *   <li>Call the <code>OpportunisticContainerAllocator</code> to allocate
+ *   containers for the opportunistic resource outstandingOpReqs</li>
+ * </ul>
+ */
+public final class LocalScheduler extends AbstractRequestInterceptor {
+
+  static class PartitionedResourceRequests {
+    private List<ResourceRequest> guaranteed = new ArrayList<>();
+    private List<ResourceRequest> opportunistic = new ArrayList<>();
+    public List<ResourceRequest> getGuaranteed() {
+      return guaranteed;
+    }
+    public List<ResourceRequest> getOpportunistic() {
+      return opportunistic;
+    }
+  }
+
+  static class DistSchedulerParams {
+    Resource maxResource;
+    Resource minResource;
+    Resource incrementResource;
+    int containerTokenExpiryInterval;
+  }
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(LocalScheduler.class);
+
+  // Currently just used to keep track of allocated Containers
+  // Can be used for reporting stats later
+  private Set<ContainerId> containersAllocated = new HashSet<>();
+
+  private DistSchedulerParams appParams = new DistSchedulerParams();
+  private final OpportunisticContainerAllocator.ContainerIdCounter containerIdCounter =
+      new OpportunisticContainerAllocator.ContainerIdCounter();
+  private Map<String, NodeId> nodeList = new HashMap<>();
+
+  // Mapping of NodeId to NodeTokens. Populated either from RM response or
+  // generated locally if required.
+  private Map<NodeId, NMToken> nodeTokens = new HashMap<>();
+  final Set<String> blacklist = new HashSet<>();
+
+  // This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority,
+  // Resource Name (Host/rack/any) and capability. This mapping is required
+  // to match a received Container to an outstanding OPPORTUNISTIC
+  // ResourceRequests (ask)
+  final TreeMap<Priority, Map<Resource, ResourceRequest>>
+      outstandingOpReqs = new TreeMap<>();
+
+  private ApplicationAttemptId applicationAttemptId;
+  private OpportunisticContainerAllocator containerAllocator;
+  private NMTokenSecretManagerInNM nmSecretManager;
+  private String appSubmitter;
+
+  public void init(AMRMProxyApplicationContext appContext) {
+    super.init(appContext);
+    initLocal(appContext.getApplicationAttemptId(),
+        appContext.getNMCotext().getContainerAllocator(),
+        appContext.getNMCotext().getNMTokenSecretManager(),
+        appContext.getUser());
+  }
+
+  @VisibleForTesting
+  void initLocal(ApplicationAttemptId applicationAttemptId,
+      OpportunisticContainerAllocator containerAllocator,
+      NMTokenSecretManagerInNM nmSecretManager, String appSubmitter) {
+    this.applicationAttemptId = applicationAttemptId;
+    this.containerAllocator = containerAllocator;
+    this.nmSecretManager = nmSecretManager;
+    this.appSubmitter = appSubmitter;
+  }
+
+  /**
+   * Route register call to the corresponding distributed scheduling method viz.
+   * registerApplicationMasterForDistributedScheduling, and return response to
+   * the caller after stripping away Distributed Scheduling information.
+   *
+   * @param request
+   *          registration request
+   * @return Allocate Response
+   * @throws YarnException
+   * @throws IOException
+   */
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster
+      (RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return registerApplicationMasterForDistributedScheduling(request)
+        .getRegisterResponse();
+  }
+
+  /**
+   * Route allocate call to the allocateForDistributedScheduling method and
+   * return response to the caller after stripping away Distributed Scheduling
+   * information.
+   *
+   * @param request
+   *          allocation request
+   * @return Allocate Response
+   * @throws YarnException
+   * @throws IOException
+   */
+  @Override
+  public AllocateResponse allocate(AllocateRequest request) throws
+      YarnException, IOException {
+    return allocateForDistributedScheduling(request).getAllocateResponse();
+  }
+
+  @Override
+  public FinishApplicationMasterResponse finishApplicationMaster
+      (FinishApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return getNextInterceptor().finishApplicationMaster(request);
+  }
+
+  /**
+   * Check if we already have a NMToken. if Not, generate the Token and
+   * add it to the response
+   * @param response
+   * @param nmTokens
+   * @param allocatedContainers
+   */
+  private void updateResponseWithNMTokens(AllocateResponse response,
+      List<NMToken> nmTokens, List<Container> allocatedContainers) {
+    List<NMToken> newTokens = new ArrayList<>();
+    if (allocatedContainers.size() > 0) {
+      response.getAllocatedContainers().addAll(allocatedContainers);
+      for (Container alloc : allocatedContainers) {
+        if (!nodeTokens.containsKey(alloc.getNodeId())) {
+          newTokens.add(nmSecretManager.generateNMToken(appSubmitter, alloc));
+        }
+      }
+      List<NMToken> retTokens = new ArrayList<>(nmTokens);
+      retTokens.addAll(newTokens);
+      response.setNMTokens(retTokens);
+    }
+  }
+
+  private PartitionedResourceRequests partitionAskList(List<ResourceRequest>
+      askList) {
+    PartitionedResourceRequests partitionedRequests =
+        new PartitionedResourceRequests();
+    for (ResourceRequest rr : askList) {
+      if (rr.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+        partitionedRequests.getOpportunistic().add(rr);
+      } else {
+        partitionedRequests.getGuaranteed().add(rr);
+      }
+    }
+    return partitionedRequests;
+  }
+
+  private void updateParameters(
+      DistSchedRegisterResponse registerResponse) {
+    appParams.minResource = registerResponse.getMinAllocatableCapabilty();
+    appParams.maxResource = registerResponse.getMaxAllocatableCapabilty();
+    appParams.incrementResource =
+        registerResponse.getIncrAllocatableCapabilty();
+    if (appParams.incrementResource == null) {
+      appParams.incrementResource = appParams.minResource;
+    }
+    appParams.containerTokenExpiryInterval = registerResponse
+        .getContainerTokenExpiryInterval();
+
+    containerIdCounter
+        .resetContainerIdCounter(registerResponse.getContainerIdStart());
+    setNodeList(registerResponse.getNodesForScheduling());
+  }
+
+  /**
+   * Takes a list of ResourceRequests (asks), extracts the key information viz.
+   * (Priority, ResourceName, Capability) and adds it the outstanding
+   * OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce
+   * the current YARN constraint that only a single ResourceRequest can exist at
+   * a give Priority and Capability
+   * @param resourceAsks
+   */
+  public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) {
+    for (ResourceRequest request : resourceAsks) {
+      Priority priority = request.getPriority();
+
+      // TODO: Extend for Node/Rack locality. We only handle ANY requests now
+      if (!ResourceRequest.isAnyLocation(request.getResourceName())) {
+        continue;
+      }
+
+      if (request.getNumContainers() == 0) {
+        continue;
+      }
+
+      Map<Resource, ResourceRequest> reqMap =
+          this.outstandingOpReqs.get(priority);
+      if (reqMap == null) {
+        reqMap = new HashMap<>();
+        this.outstandingOpReqs.put(priority, reqMap);
+      }
+
+      ResourceRequest resourceRequest = reqMap.get(request.getCapability());
+      if (resourceRequest == null) {
+        resourceRequest = request;
+        reqMap.put(request.getCapability(), request);
+      } else {
+        resourceRequest.setNumContainers(
+            resourceRequest.getNumContainers() + request.getNumContainers());
+      }
+      if (ResourceRequest.isAnyLocation(request.getResourceName())) {
+        LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority
+            + ", with capability = " + request.getCapability() + " ) : "
+            + resourceRequest.getNumContainers());
+      }
+    }
+  }
+
+  /**
+   * This method matches a returned list of Container Allocations to any
+   * outstanding OPPORTUNISTIC ResourceRequest
+   * @param capability
+   * @param allocatedContainers
+   */
+  public void matchAllocationToOutstandingRequest(Resource capability,
+      List<Container> allocatedContainers) {
+    for (Container c : allocatedContainers) {
+      containersAllocated.add(c.getId());
+      Map<Resource, ResourceRequest> asks =
+          outstandingOpReqs.get(c.getPriority());
+
+      if (asks == null)
+        continue;
+
+      ResourceRequest rr = asks.get(capability);
+      if (rr != null) {
+        rr.setNumContainers(rr.getNumContainers() - 1);
+        if (rr.getNumContainers() == 0) {
+          asks.remove(capability);
+        }
+      }
+    }
+  }
+
+  private void setNodeList(List<NodeId> nodeList) {
+    this.nodeList.clear();
+    addToNodeList(nodeList);
+  }
+
+  private void addToNodeList(List<NodeId> nodes) {
+    for (NodeId n : nodes) {
+      this.nodeList.put(n.getHost(), n);
+    }
+  }
+
+  @Override
+  public DistSchedRegisterResponse
+  registerApplicationMasterForDistributedScheduling
+      (RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    LOG.info("Forwarding registration request to the" +
+        "Distributed Scheduler Service on YARN RM");
+    DistSchedRegisterResponse dsResp = getNextInterceptor()
+        .registerApplicationMasterForDistributedScheduling(request);
+    updateParameters(dsResp);
+    return dsResp;
+  }
+
+  @Override
+  public DistSchedAllocateResponse allocateForDistributedScheduling
+      (AllocateRequest request) throws YarnException, IOException {
+    LOG.info("Forwarding allocate request to the" +
+        "Distributed Scheduler Service on YARN RM");
+    // Partition requests into GUARANTEED and OPPORTUNISTIC reqs
+    PartitionedResourceRequests partitionedAsks = partitionAskList(request
+        .getAskList());
+
+    List<ContainerId> releasedContainers = request.getReleaseList();
+    int numReleasedContainers = releasedContainers.size();
+    if (numReleasedContainers > 0) {
+      LOG.info("AttemptID: " + applicationAttemptId + " released: "
+          + numReleasedContainers);
+      containersAllocated.removeAll(releasedContainers);
+    }
+
+    // Also, update black list
+    ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest();
+    if (rbr != null) {
+      blacklist.removeAll(rbr.getBlacklistRemovals());
+      blacklist.addAll(rbr.getBlacklistAdditions());
+    }
+
+    // Add OPPORTUNISTIC reqs to the outstanding reqs
+    addToOutstandingReqs(partitionedAsks.getOpportunistic());
+
+    List<Container> allocatedContainers = new ArrayList<>();
+    for (Priority priority : outstandingOpReqs.descendingKeySet()) {
+      // Allocated containers :
+      //  Key = Requested Capability,
+      //  Value = List of Containers of given Cap (The actual container size
+      //          might be different than what is requested.. which is why
+      //          we need the requested capability (key) to match against
+      //          the outstanding reqs)
+      Map<Resource, List<Container>> allocated =
+          containerAllocator.allocate(this.appParams, containerIdCounter,
+              outstandingOpReqs.get(priority).values(), blacklist,
+              applicationAttemptId, nodeList, appSubmitter);
+      for (Map.Entry<Resource, List<Container>> e : allocated.entrySet()) {
+        matchAllocationToOutstandingRequest(e.getKey(), e.getValue());
+        allocatedContainers.addAll(e.getValue());
+      }
+    }
+
+    // Send all the GUARANTEED Reqs to RM
+    request.setAskList(partitionedAsks.getGuaranteed());
+    DistSchedAllocateResponse dsResp =
+        getNextInterceptor().allocateForDistributedScheduling(request);
+
+    // Update host to nodeId mapping
+    setNodeList(dsResp.getNodesForScheduling());
+    List<NMToken> nmTokens = dsResp.getAllocateResponse().getNMTokens();
+    for (NMToken nmToken : nmTokens) {
+      nodeTokens.put(nmToken.getNodeId(), nmToken);
+    }
+
+    List<ContainerStatus> completedContainers =
+        dsResp.getAllocateResponse().getCompletedContainersStatuses();
+
+    // Only account for opportunistic containers
+    for (ContainerStatus cs : completedContainers) {
+      if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+        containersAllocated.remove(cs.getContainerId());
+      }
+    }
+
+    // Check if we have NM tokens for all the allocated containers. If not
+    // generate one and update the response.
+    updateResponseWithNMTokens(
+        dsResp.getAllocateResponse(), nmTokens, allocatedContainers);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          "Number of opportunistic containers currently allocated by" +
+              "application: " + containersAllocated.size());
+    }
+    return dsResp;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c282a08f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
new file mode 100644
index 0000000..03ba61d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.scheduler;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.ContainerType;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.scheduler.LocalScheduler.DistSchedulerParams;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * <p>The OpportunisticContainerAllocator allocates containers on a given list
+ * of Nodes after it modifies the container sizes to within allowable limits
+ * specified by the <code>ClusterManager</code> running on the RM. It tries to
+ * distribute the containers as evenly as possible. It also uses the
+ * <code>NMTokenSecretManagerInNM</code> to generate the required NM tokens for
+ * the allocated containers</p>
+ */
+public class OpportunisticContainerAllocator {
+
+  private static final Log LOG =
+      LogFactory.getLog(OpportunisticContainerAllocator.class);
+
+  private static final ResourceCalculator RESOURCE_CALCULATOR =
+      new DominantResourceCalculator();
+
+  static class ContainerIdCounter {
+    final AtomicLong containerIdCounter = new AtomicLong(1);
+
+    void resetContainerIdCounter(long containerIdStart) {
+      this.containerIdCounter.set(containerIdStart);
+    }
+
+    long generateContainerId() {
+      return this.containerIdCounter.decrementAndGet();
+    }
+  }
+
+  private final NodeStatusUpdater nodeStatusUpdater;
+  private final Context context;
+  private int webpagePort;
+
+  public OpportunisticContainerAllocator(NodeStatusUpdater nodeStatusUpdater,
+      Context context, int webpagePort) {
+    this.nodeStatusUpdater = nodeStatusUpdater;
+    this.context = context;
+    this.webpagePort = webpagePort;
+  }
+
+  public Map<Resource, List<Container>> allocate(DistSchedulerParams appParams,
+      ContainerIdCounter idCounter, Collection<ResourceRequest> resourceAsks,
+      Set<String> blacklist, ApplicationAttemptId appAttId,
+      Map<String, NodeId> allNodes, String userName) throws YarnException {
+    Map<Resource, List<Container>> containers = new HashMap<>();
+    Set<String> nodesAllocated = new HashSet<>();
+    int numAsks = resourceAsks.size();
+    for (ResourceRequest anyAsk : resourceAsks) {
+      allocateOpportunisticContainers(appParams, idCounter, blacklist, appAttId,
+          allNodes, userName, containers, nodesAllocated, anyAsk);
+    }
+    if (numAsks > 0) {
+      LOG.info("Opportunistic allocation requested for: " + numAsks
+          + " containers; allocated = " + containers.size());
+    }
+    return containers;
+  }
+
+  private void allocateOpportunisticContainers(DistSchedulerParams appParams,
+      ContainerIdCounter idCounter, Set<String> blacklist,
+      ApplicationAttemptId id, Map<String, NodeId> allNodes, String userName,
+      Map<Resource, List<Container>> containers, Set<String> nodesAllocated,
+      ResourceRequest anyAsk) throws YarnException {
+    int toAllocate = anyAsk.getNumContainers()
+        - (containers.isEmpty() ?
+        0 : containers.get(anyAsk.getCapability()).size());
+
+    List<String> topKNodesLeft = new ArrayList<>();
+    for (String s : allNodes.keySet()) {
+      // Bias away from whatever we have already allocated and respect blacklist
+      if (nodesAllocated.contains(s) || blacklist.contains(s)) {
+        continue;
+      }
+      topKNodesLeft.add(s);
+    }
+    int numAllocated = 0;
+    int nextNodeToAllocate = 0;
+    for (int numCont = 0; numCont < toAllocate; numCont++) {
+      String topNode = topKNodesLeft.get(nextNodeToAllocate);
+      nextNodeToAllocate++;
+      nextNodeToAllocate %= topKNodesLeft.size();
+      NodeId nodeId = allNodes.get(topNode);
+      Container container = buildContainer(appParams, idCounter, anyAsk, id,
+          userName, nodeId);
+      List<Container> cList = containers.get(anyAsk.getCapability());
+      if (cList == null) {
+        cList = new ArrayList<>();
+        containers.put(anyAsk.getCapability(), cList);
+      }
+      cList.add(container);
+      numAllocated++;
+      LOG.info("Allocated " + numAllocated + " opportunistic containers.");
+    }
+  }
+
+  private Container buildContainer(DistSchedulerParams appParams,
+      ContainerIdCounter idCounter, ResourceRequest rr, ApplicationAttemptId id,
+      String userName, NodeId nodeId) throws YarnException {
+    ContainerId cId =
+        ContainerId.newContainerId(id, idCounter.generateContainerId());
+
+    // Normalize the resource asks (Similar to what the the RM scheduler does
+    // before accepting an ask)
+    Resource capability = normalizeCapability(appParams, rr);
+
+    long currTime = System.currentTimeMillis();
+    ContainerTokenIdentifier containerTokenIdentifier =
+        new ContainerTokenIdentifier(
+            cId, nodeId.getHost(), userName, capability,
+            currTime + appParams.containerTokenExpiryInterval,
+            context.getContainerTokenSecretManager().getCurrentKey().getKeyId(),
+            nodeStatusUpdater.getRMIdentifier(), rr.getPriority(), currTime,
+            null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK,
+            ExecutionType.OPPORTUNISTIC);
+    byte[] pwd =
+        context.getContainerTokenSecretManager().createPassword(
+            containerTokenIdentifier);
+    Token containerToken = newContainerToken(nodeId, pwd,
+        containerTokenIdentifier);
+    Container container = BuilderUtils.newContainer(
+        cId, nodeId, nodeId.getHost() + ":" + webpagePort,
+        capability, rr.getPriority(), containerToken);
+    return container;
+  }
+
+  private Resource normalizeCapability(DistSchedulerParams appParams,
+      ResourceRequest ask) {
+    return Resources.normalize(RESOURCE_CALCULATOR,
+        ask.getCapability(), appParams.minResource, appParams.maxResource,
+        appParams.incrementResource);
+  }
+
+  public static Token newContainerToken(NodeId nodeId, byte[] password,
+      ContainerTokenIdentifier tokenIdentifier) {
+    // RPC layer client expects ip:port as service for tokens
+    InetSocketAddress addr = NetUtils.createSocketAddrForHost(nodeId.getHost(),
+        nodeId.getPort());
+    // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token
+    Token containerToken = Token.newInstance(tokenIdentifier.getBytes(),
+        ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil
+            .buildTokenService(addr).toString());
+    return containerToken;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c282a08f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
index f6169e7..86cce35 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
@@ -29,7 +29,10 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
@@ -50,7 +53,7 @@ public class NMTokenSecretManagerInNM extends BaseNMTokenSecretManager {
   private final Map<ApplicationAttemptId, MasterKeyData> oldMasterKeys;
   private final Map<ApplicationId, List<ApplicationAttemptId>> appToAppAttemptMap;
   private final NMStateStoreService stateStore;
-  private NodeId nodeId;                                                      
+  private NodeId nodeId;
   
   public NMTokenSecretManagerInNM() {
     this(new NMNullStateStoreService());
@@ -276,4 +279,23 @@ public class NMTokenSecretManagerInNM extends BaseNMTokenSecretManager {
       LOG.error("Unable to remove master key for application " + attempt, e);
     }
   }
+
+  /**
+   * Used by the Distributed Scheduler framework to generate NMTokens
+   * @param applicationSubmitter
+   * @param container
+   * @return NMToken
+   */
+  public NMToken generateNMToken(
+      String applicationSubmitter, Container container) {
+    this.readLock.lock();
+    try {
+      Token token =
+          createNMToken(container.getId().getApplicationAttemptId(),
+              container.getNodeId(), applicationSubmitter);
+      return NMToken.newInstance(container.getNodeId(), token);
+    } finally {
+      this.readLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c282a08f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
index ddbfbb9..f126080 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
@@ -80,7 +80,7 @@ public class TestEventFlow {
     
     Context context = new NMContext(new NMContainerTokenSecretManager(conf),
         new NMTokenSecretManagerInNM(), null, null,
-        new NMNullStateStoreService()) {
+        new NMNullStateStoreService(), false) {
       @Override
       public int getHttpPort() {
         return 1234;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c282a08f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index 2fcce1d..8c235eb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -1583,7 +1583,7 @@ public class TestNodeStatusUpdater {
       protected NMContext createNMContext(
           NMContainerTokenSecretManager containerTokenSecretManager,
           NMTokenSecretManagerInNM nmTokenSecretManager,
-          NMStateStoreService store) {
+          NMStateStoreService store, boolean isDistributedSchedulingEnabled) {
         return new MyNMContext(containerTokenSecretManager,
           nmTokenSecretManager);
       }
@@ -1818,7 +1818,7 @@ public class TestNodeStatusUpdater {
         NMContainerTokenSecretManager containerTokenSecretManager,
         NMTokenSecretManagerInNM nmTokenSecretManager) {
       super(containerTokenSecretManager, nmTokenSecretManager, null, null,
-          new NMNullStateStoreService());
+          new NMNullStateStoreService(), false);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c282a08f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
index 6c904eb..ce405f8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -688,5 +689,14 @@ public abstract class BaseAMRMProxyTest {
     public QueuingContext getQueuingContext() {
       return null;
     }
+
+    public boolean isDistributedSchedulingEnabled() {
+      return false;
+    }
+
+    @Override
+    public OpportunisticContainerAllocator getContainerAllocator() {
+      return null;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c282a08f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
index f37129e..638e51f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
@@ -110,7 +110,7 @@ public abstract class BaseContainerManagerTest {
   protected Configuration conf = new YarnConfiguration();
   protected Context context = new NMContext(new NMContainerTokenSecretManager(
     conf), new NMTokenSecretManagerInNM(), null,
-    new ApplicationACLsManager(conf), new NMNullStateStoreService()) {
+    new ApplicationACLsManager(conf), new NMNullStateStoreService(), false) {
     public int getHttpPort() {
       return HTTP_PORT;
     };

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c282a08f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index 9fa3fcc..ccdfd64 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -475,7 +475,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
       NMStateStoreService stateStore) {
     NMContext context = new NMContext(new NMContainerTokenSecretManager(
         conf), new NMTokenSecretManagerInNM(), null,
-        new ApplicationACLsManager(conf), stateStore){
+        new ApplicationACLsManager(conf), stateStore, false){
       public int getHttpPort() {
         return HTTP_PORT;
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c282a08f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
index 1169c68..cf7ca8d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
@@ -113,7 +113,7 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
   private static final String INVALID_JAVA_HOME = "/no/jvm/here";
   protected Context distContext = new NMContext(new NMContainerTokenSecretManager(
     conf), new NMTokenSecretManagerInNM(), null,
-    new ApplicationACLsManager(conf), new NMNullStateStoreService()) {
+    new ApplicationACLsManager(conf), new NMNullStateStoreService(), false) {
     public int getHttpPort() {
       return HTTP_PORT;
     };

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c282a08f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
index 9e08b7f..c768df1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
@@ -81,7 +81,8 @@ public class TestLocalCacheDirectoryManager {
     NMContext nmContext =
         new NMContext(new NMContainerTokenSecretManager(conf),
           new NMTokenSecretManagerInNM(), null,
-          new ApplicationACLsManager(conf), new NMNullStateStoreService());
+          new ApplicationACLsManager(conf), new NMNullStateStoreService(),
+            false);
     ResourceLocalizationService service =
         new ResourceLocalizationService(null, null, null, null, nmContext);
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c282a08f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
index bca0752..c612c14 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
@@ -186,7 +186,7 @@ public class TestResourceLocalizationService {
     conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
     nmContext = new NMContext(new NMContainerTokenSecretManager(
       conf), new NMTokenSecretManagerInNM(), null,
-      new ApplicationACLsManager(conf), new NMNullStateStoreService());
+      new ApplicationACLsManager(conf), new NMNullStateStoreService(), false);
   }
 
   @After
@@ -2365,7 +2365,7 @@ public class TestResourceLocalizationService {
     NMContext nmContext =
         new NMContext(new NMContainerTokenSecretManager(conf),
           new NMTokenSecretManagerInNM(), null,
-          new ApplicationACLsManager(conf), stateStore);
+          new ApplicationACLsManager(conf), stateStore, false);
     ResourceLocalizationService rawService =
       new ResourceLocalizationService(dispatcher, exec, delService,
                                       dirsHandler, nmContext);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c282a08f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
new file mode 100644
index 0000000..efc682a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.scheduler;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords
+    .RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords
+    .RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
+import org.apache.hadoop.yarn.server.nodemanager.security
+    .NMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.security
+    .NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestLocalScheduler {
+
+  @Test
+  public void testLocalScheduler() throws Exception {
+
+    Configuration conf = new Configuration();
+    LocalScheduler localScheduler = new LocalScheduler();
+
+    NodeStatusUpdater nodeStatusUpdater = Mockito.mock(NodeStatusUpdater.class);
+    Mockito.when(nodeStatusUpdater.getRMIdentifier()).thenReturn(12345l);
+    Context context = Mockito.mock(Context.class);
+    NMContainerTokenSecretManager nmContainerTokenSecretManager = new
+        NMContainerTokenSecretManager(conf);
+    MasterKey mKey = new MasterKey() {
+      @Override
+      public int getKeyId() {
+        return 1;
+      }
+      @Override
+      public void setKeyId(int keyId) {}
+      @Override
+      public ByteBuffer getBytes() {
+        return ByteBuffer.allocate(8);
+      }
+      @Override
+      public void setBytes(ByteBuffer bytes) {}
+    };
+    nmContainerTokenSecretManager.setMasterKey(mKey);
+    Mockito.when(context.getContainerTokenSecretManager()).thenReturn
+        (nmContainerTokenSecretManager);
+    OpportunisticContainerAllocator containerAllocator =
+        new OpportunisticContainerAllocator(nodeStatusUpdater, context, 7777);
+
+    NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
+        new NMTokenSecretManagerInNM();
+    nmTokenSecretManagerInNM.setMasterKey(mKey);
+    localScheduler.initLocal(
+        ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1),
+        containerAllocator, nmTokenSecretManagerInNM, "test");
+
+    RequestInterceptor finalReqIntcptr = Mockito.mock(RequestInterceptor.class);
+    localScheduler.setNextInterceptor(finalReqIntcptr);
+
+    DistSchedRegisterResponse distSchedRegisterResponse =
+        Records.newRecord(DistSchedRegisterResponse.class);
+    distSchedRegisterResponse.setRegisterResponse(
+        Records.newRecord(RegisterApplicationMasterResponse.class));
+    distSchedRegisterResponse.setContainerTokenExpiryInterval(12345);
+    distSchedRegisterResponse.setContainerIdStart(0);
+    distSchedRegisterResponse.setMaxAllocatableCapabilty(
+        Resource.newInstance(1024, 4));
+    distSchedRegisterResponse.setMinAllocatableCapabilty(
+        Resource.newInstance(512, 2));
+    distSchedRegisterResponse.setNodesForScheduling(Arrays.asList(
+        NodeId.newInstance("a", 1), NodeId.newInstance("b", 2)));
+    Mockito.when(
+        finalReqIntcptr.registerApplicationMasterForDistributedScheduling(
+            Mockito.any(RegisterApplicationMasterRequest.class)))
+        .thenReturn(distSchedRegisterResponse);
+
+    localScheduler.registerApplicationMaster(
+        Records.newRecord(RegisterApplicationMasterRequest.class));
+
+    Mockito.when(
+        finalReqIntcptr.allocateForDistributedScheduling(
+            Mockito.any(AllocateRequest.class)))
+        .thenAnswer(new Answer<DistSchedAllocateResponse>() {
+          @Override
+          public DistSchedAllocateResponse answer(InvocationOnMock
+              invocationOnMock) throws Throwable {
+            return createAllocateResponse(Arrays.asList(
+                NodeId.newInstance("c", 3), NodeId.newInstance("d", 4)));
+          }
+        });
+
+    AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
+    ResourceRequest guaranteedReq = Records.newRecord(ResourceRequest.class);
+    guaranteedReq.setExecutionType(ExecutionType.GUARANTEED);
+    guaranteedReq.setNumContainers(5);
+    guaranteedReq.setCapability(Resource.newInstance(2048, 2));
+    guaranteedReq.setRelaxLocality(true);
+    guaranteedReq.setResourceName("*");
+    ResourceRequest opportunisticReq = Records.newRecord(ResourceRequest.class);
+    opportunisticReq.setExecutionType(ExecutionType.OPPORTUNISTIC);
+    opportunisticReq.setNumContainers(4);
+    opportunisticReq.setCapability(Resource.newInstance(1024, 4));
+    opportunisticReq.setPriority(Priority.newInstance(100));
+    opportunisticReq.setRelaxLocality(true);
+    opportunisticReq.setResourceName("*");
+    allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
+
+    // Verify 4 containers were allocated
+    AllocateResponse allocateResponse = localScheduler.allocate(allocateRequest);
+    Assert.assertEquals(4, allocateResponse.getAllocatedContainers().size());
+
+    // Verify equal distribution on hosts a and b
+    // And None on c and d
+    Map<NodeId, List<ContainerId>> allocs = mapAllocs(allocateResponse);
+    Assert.assertEquals(2, allocs.get(NodeId.newInstance("a", 1)).size());
+    Assert.assertEquals(2, allocs.get(NodeId.newInstance("b", 2)).size());
+    Assert.assertNull(allocs.get(NodeId.newInstance("c", 3)));
+    Assert.assertNull(allocs.get(NodeId.newInstance("d", 4)));
+
+    // New Allocate request
+    allocateRequest = Records.newRecord(AllocateRequest.class);
+    opportunisticReq = Records.newRecord(ResourceRequest.class);
+    opportunisticReq.setExecutionType(ExecutionType.OPPORTUNISTIC);
+    opportunisticReq.setNumContainers(6);
+    opportunisticReq.setCapability(Resource.newInstance(512, 3));
+    opportunisticReq.setPriority(Priority.newInstance(100));
+    opportunisticReq.setRelaxLocality(true);
+    opportunisticReq.setResourceName("*");
+    allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
+
+    // Verify 6 containers were allocated
+    allocateResponse = localScheduler.allocate(allocateRequest);
+    Assert.assertEquals(6, allocateResponse.getAllocatedContainers().size());
+
+    // Verify New containers are equally distribution on hosts c and d
+    // And None on a and b
+    allocs = mapAllocs(allocateResponse);
+    Assert.assertEquals(3, allocs.get(NodeId.newInstance("c", 3)).size());
+    Assert.assertEquals(3, allocs.get(NodeId.newInstance("d", 4)).size());
+    Assert.assertNull(allocs.get(NodeId.newInstance("a", 1)));
+    Assert.assertNull(allocs.get(NodeId.newInstance("b", 2)));
+  }
+
+  private DistSchedAllocateResponse createAllocateResponse(List<NodeId> nodes) {
+    DistSchedAllocateResponse distSchedAllocateResponse = Records.newRecord
+        (DistSchedAllocateResponse.class);
+    distSchedAllocateResponse.setAllocateResponse(
+        Records.newRecord(AllocateResponse.class));
+    distSchedAllocateResponse.setNodesForScheduling(nodes);
+    return distSchedAllocateResponse;
+  }
+
+  private Map<NodeId, List<ContainerId>> mapAllocs(AllocateResponse
+      allocateResponse) {
+    Map<NodeId, List<ContainerId>> allocs = new HashMap<>();
+    for (Container c : allocateResponse.getAllocatedContainers()) {
+      List<ContainerId> cIds = allocs.get(c.getNodeId());
+      if (cIds == null) {
+        cIds = new ArrayList<>();
+        allocs.put(c.getNodeId(), cIds);
+      }
+      cIds.add(c.getId());
+    }
+    return allocs;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c282a08f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java
index 84e42fc..6a72cc0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java
@@ -96,7 +96,7 @@ public class TestContainerLogsPage {
     healthChecker.init(conf);
     LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler();
     NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler,
-        new ApplicationACLsManager(conf), new NMNullStateStoreService());
+        new ApplicationACLsManager(conf), new NMNullStateStoreService(), false);
     // Add an application and the corresponding containers
     RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(conf);
     String user = "nobody";
@@ -136,7 +136,7 @@ public class TestContainerLogsPage {
     when(dirsHandlerForFullDisk.getLogDirsForRead()).
         thenReturn(Arrays.asList(new String[] {absLogDir.getAbsolutePath()}));
     nmContext = new NodeManager.NMContext(null, null, dirsHandlerForFullDisk,
-        new ApplicationACLsManager(conf), new NMNullStateStoreService());
+        new ApplicationACLsManager(conf), new NMNullStateStoreService(), false);
     nmContext.getApplications().put(appId, app);
     container.setState(ContainerState.RUNNING);
     nmContext.getContainers().put(container1, container);
@@ -158,7 +158,7 @@ public class TestContainerLogsPage {
     LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
     dirsHandler.init(conf);
     NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler,
-        new ApplicationACLsManager(conf), new NMNullStateStoreService());
+        new ApplicationACLsManager(conf), new NMNullStateStoreService(), false);
     // Add an application and the corresponding containers
     String user = "nobody";
     long clusterTimeStamp = 1234;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c282a08f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMAppsPage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMAppsPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMAppsPage.java
index e64d43c..ca729f5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMAppsPage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMAppsPage.java
@@ -62,7 +62,8 @@ public class TestNMAppsPage {
     Configuration conf = new Configuration();
     final NMContext nmcontext = new NMContext(
         new NMContainerTokenSecretManager(conf), new NMTokenSecretManagerInNM(),
-        null, new ApplicationACLsManager(conf), new NMNullStateStoreService());
+        null, new ApplicationACLsManager(conf), new NMNullStateStoreService(),
+        false);
     Injector injector = WebAppTests.createMockInjector(NMContext.class,
         nmcontext, new Module() {
           @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c282a08f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
index f0c7cbc..b90c1be 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
@@ -87,7 +87,7 @@ public class TestNMWebServer {
 
   private int startNMWebAppServer(String webAddr) {
     Context nmContext = new NodeManager.NMContext(null, null, null, null,
-        null);
+        null, false);
     ResourceView resourceView = new ResourceView() {
       @Override
       public long getVmemAllocatedForContainers() {
@@ -150,7 +150,7 @@ public class TestNMWebServer {
   @Test
   public void testNMWebApp() throws IOException, YarnException {
     Context nmContext = new NodeManager.NMContext(null, null, null, null,
-        null);
+        null, false);
     ResourceView resourceView = new ResourceView() {
       @Override
       public long getVmemAllocatedForContainers() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c282a08f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
index 1f5590c..2ac0956 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
@@ -111,7 +111,7 @@ public class TestNMWebServices extends JerseyTestBase {
       healthChecker.init(conf);
       aclsManager = new ApplicationACLsManager(conf);
       nmContext = new NodeManager.NMContext(null, null, dirsHandler,
-          aclsManager, null);
+          aclsManager, null, false);
       NodeId nodeId = NodeId.newInstance("testhost.foo.com", 8042);
       ((NodeManager.NMContext)nmContext).setNodeId(nodeId);
       resourceView = new ResourceView() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c282a08f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java
index 3cd8faf..b95b180 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java
@@ -104,7 +104,7 @@ public class TestNMWebServicesApps extends JerseyTestBase {
       dirsHandler = healthChecker.getDiskHandler();
       aclsManager = new ApplicationACLsManager(conf);
       nmContext = new NodeManager.NMContext(null, null, dirsHandler,
-          aclsManager, null);
+          aclsManager, null, false);
       NodeId nodeId = NodeId.newInstance("testhost.foo.com", 9999);
       ((NodeManager.NMContext)nmContext).setNodeId(nodeId);
       resourceView = new ResourceView() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c282a08f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java
index 3c4a660..18239f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java
@@ -136,7 +136,7 @@ public class TestNMWebServicesContainers extends JerseyTestBase {
       dirsHandler = healthChecker.getDiskHandler();
       aclsManager = new ApplicationACLsManager(conf);
       nmContext = new NodeManager.NMContext(null, null, dirsHandler,
-          aclsManager, null) {
+          aclsManager, null, false) {
         public NodeId getNodeId() {
           return NodeId.newInstance("testhost.foo.com", 8042);
         };

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c282a08f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index ab94175..4f90fa0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -48,6 +48,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -89,6 +91,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.security
+    .AMRMTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
 import org.apache.hadoop.yarn.server.security.MasterKeyData;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -104,21 +108,27 @@ public class ApplicationMasterService extends AbstractService implements
   private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class);
   private final AMLivelinessMonitor amLivelinessMonitor;
   private YarnScheduler rScheduler;
-  private InetSocketAddress masterServiceAddress;
-  private Server server;
-  private final RecordFactory recordFactory =
+  protected InetSocketAddress masterServiceAddress;
+  protected Server server;
+  protected final RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
   private final ConcurrentMap<ApplicationAttemptId, AllocateResponseLock> responseMap =
       new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
-  private final RMContext rmContext;
+  protected final RMContext rmContext;
 
-  public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
-    super(ApplicationMasterService.class.getName());
+  public ApplicationMasterService(String name, RMContext rmContext,
+      YarnScheduler scheduler) {
+    super(name);
     this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
     this.rScheduler = scheduler;
     this.rmContext = rmContext;
   }
 
+  public ApplicationMasterService(RMContext rmContext,
+      YarnScheduler scheduler) {
+    this(ApplicationMasterService.class.getName(), rmContext, scheduler);
+  }
+
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
     masterServiceAddress = conf.getSocketAddr(
@@ -139,11 +149,8 @@ public class ApplicationMasterService extends AbstractService implements
     serverConf.set(
         CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
         SaslRpcServer.AuthMethod.TOKEN.toString());
-    this.server =
-      rpc.getServer(ApplicationMasterProtocol.class, this, masterServiceAddress,
-          serverConf, this.rmContext.getAMRMTokenSecretManager(),
-          serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT, 
-              YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
+    this.server = getServer(rpc, serverConf, masterServiceAddress,
+        this.rmContext.getAMRMTokenSecretManager());
     
     // Enable service authorization?
     if (conf.getBoolean(
@@ -158,7 +165,7 @@ public class ApplicationMasterService extends AbstractService implements
       }
       refreshServiceAcls(conf, RMPolicyProvider.getInstance());
     }
-    
+
     this.server.start();
     this.masterServiceAddress =
         conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
@@ -168,6 +175,14 @@ public class ApplicationMasterService extends AbstractService implements
     super.serviceStart();
   }
 
+  protected Server getServer(YarnRPC rpc, Configuration serverConf,
+      InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
+    return rpc.getServer(ApplicationMasterProtocol.class, this, addr,
+        serverConf, secretManager,
+        serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
+  }
+
   @Private
   public InetSocketAddress getBindAddress() {
     return this.masterServiceAddress;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c282a08f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
new file mode 100644
index 0000000..5210f7f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
+
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.security
+    .AMRMTokenSecretManager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+
+public class DistributedSchedulingService extends ApplicationMasterService
+    implements DistributedSchedulerProtocol {
+
+  public DistributedSchedulingService(RMContext rmContext,
+      YarnScheduler scheduler) {
+    super(DistributedSchedulingService.class.getName(), rmContext, scheduler);
+  }
+
+  @Override
+  public Server getServer(YarnRPC rpc, Configuration serverConf,
+      InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
+    Server server = rpc.getServer(DistributedSchedulerProtocol.class, this,
+        addr, serverConf, secretManager,
+        serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
+    // To support application running no NMs that DO NOT support
+    // Dist Scheduling...
+    ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+        ApplicationMasterProtocolPB.class,
+        ApplicationMasterProtocolService.newReflectiveBlockingService(
+            new ApplicationMasterProtocolPBServiceImpl(this)));
+    return server;
+  }
+
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster
+      (RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return super.registerApplicationMaster(request);
+  }
+
+  @Override
+  public FinishApplicationMasterResponse finishApplicationMaster
+      (FinishApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return super.finishApplicationMaster(request);
+  }
+
+  @Override
+  public AllocateResponse allocate(AllocateRequest request) throws
+      YarnException, IOException {
+    return super.allocate(request);
+  }
+
+  @Override
+  public DistSchedRegisterResponse
+  registerApplicationMasterForDistributedScheduling(
+      RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    RegisterApplicationMasterResponse response =
+        registerApplicationMaster(request);
+    DistSchedRegisterResponse dsResp = recordFactory
+        .newRecordInstance(DistSchedRegisterResponse.class);
+    dsResp.setRegisterResponse(response);
+    dsResp.setMinAllocatableCapabilty(
+        Resource.newInstance(
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY,
+                YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY_DEFAULT),
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_MIN_VCORES,
+                YarnConfiguration.DIST_SCHEDULING_MIN_VCORES_DEFAULT)
+        )
+    );
+    dsResp.setMaxAllocatableCapabilty(
+        Resource.newInstance(
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY,
+                YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_DEFAULT),
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_MAX_VCORES,
+                YarnConfiguration.DIST_SCHEDULING_MAX_VCORES_DEFAULT)
+        )
+    );
+    dsResp.setIncrAllocatableCapabilty(
+        Resource.newInstance(
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY,
+                YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY_DEFAULT),
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_INCR_VCORES,
+                YarnConfiguration.DIST_SCHEDULING_INCR_VCORES_DEFAULT)
+        )
+    );
+    dsResp.setContainerTokenExpiryInterval(
+        getConfig().getInt(
+            YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS,
+            YarnConfiguration.
+                DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT));
+    dsResp.setContainerIdStart(
+        this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);
+
+    // Set nodes to be used for scheduling
+    // TODO: The actual computation of the list will happen in YARN-4412
+    // TODO: Till then, send the complete list
+    dsResp.setNodesForScheduling(
+        new ArrayList<>(this.rmContext.getRMNodes().keySet()));
+    return dsResp;
+  }
+
+  @Override
+  public DistSchedAllocateResponse allocateForDistributedScheduling
+      (AllocateRequest request) throws YarnException, IOException {
+    AllocateResponse response = allocate(request);
+    DistSchedAllocateResponse dsResp = recordFactory.newRecordInstance
+        (DistSchedAllocateResponse.class);
+    dsResp.setAllocateResponse(response);
+    dsResp.setNodesForScheduling(
+        new ArrayList<>(this.rmContext.getRMNodes().keySet()));
+    return dsResp;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c282a08f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 2744bb4..2fc940b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -134,6 +134,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
    */
   public static final int SHUTDOWN_HOOK_PRIORITY = 30;
 
+  /**
+   * Used for generation of various ids.
+   */
+  public static final int EPOCH_BIT_SHIFT = 40;
+
   private static final Log LOG = LogFactory.getLog(ResourceManager.class);
   private static long clusterTimeStamp = System.currentTimeMillis();
 
@@ -1226,6 +1231,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
   }
 
   protected ApplicationMasterService createApplicationMasterService() {
+    if (this.rmContext.getYarnConfiguration().getBoolean(
+        YarnConfiguration.DIST_SCHEDULING_ENABLED,
+        YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
+      return new DistributedSchedulingService(this.rmContext, scheduler);
+    }
     return new ApplicationMasterService(this.rmContext, scheduler);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c282a08f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 5952cc2..463bebd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -93,7 +93,8 @@ public class AppSchedulingInfo {
     this.queue = queue;
     this.user = user;
     this.activeUsersManager = activeUsersManager;
-    this.containerIdCounter = new AtomicLong(epoch << EPOCH_BIT_SHIFT);
+    this.containerIdCounter =
+        new AtomicLong(epoch << EPOCH_BIT_SHIFT);
     this.appResourceUsage = appResourceUsage;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c282a08f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 25c558f..2443214 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -93,6 +93,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+
+
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.YarnVersionInfo;
 import org.apache.log4j.Level;
@@ -740,6 +742,21 @@ public class MockRM extends ResourceManager {
 
   @Override
   protected ApplicationMasterService createApplicationMasterService() {
+    if (this.rmContext.getYarnConfiguration().getBoolean(
+        YarnConfiguration.DIST_SCHEDULING_ENABLED,
+        YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
+      return new DistributedSchedulingService(getRMContext(), scheduler) {
+        @Override
+        protected void serviceStart() {
+          // override to not start rpc handler
+        }
+
+        @Override
+        protected void serviceStop() {
+          // don't do anything
+        }
+      };
+    }
     return new ApplicationMasterService(getRMContext(), scheduler) {
       @Override
       protected void serviceStart() {


Mime
View raw message