hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asur...@apache.org
Subject [50/50] [abbrv] hadoop git commit: YARN-4412. Create ClusterMonitor to compute ordered list of preferred NMs for OPPORTUNITIC containers (asuresh)
Date Fri, 22 Apr 2016 18:59:13 GMT
YARN-4412. Create ClusterMonitor to compute ordered list of preferred NMs for OPPORTUNITIC containers (asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c725b97d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c725b97d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c725b97d

Branch: refs/heads/yarn-2877
Commit: c725b97df61a019664eca8f64caefa8f6d0baab0
Parents: 77d3280
Author: Arun Suresh <asuresh@apache.org>
Authored: Thu Feb 11 13:48:36 2016 -0800
Committer: Arun Suresh <asuresh@apache.org>
Committed: Fri Apr 22 11:44:05 2016 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/sls/nodemanager/NodeInfo.java   |   5 +
 .../yarn/sls/scheduler/RMNodeWrapper.java       |   5 +
 hadoop-yarn-project/CHANGES-yarn-2877.txt       |   2 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  17 +-
 .../hadoop/yarn/event/EventDispatcher.java      | 137 ++++++++
 .../src/main/resources/yarn-default.xml         |  30 ++
 .../yarn/server/api/records/NodeStatus.java     |   9 +
 .../api/records/QueuedContainersStatus.java     |  45 +++
 .../api/records/impl/pb/NodeStatusPBImpl.java   |  40 ++-
 .../impl/pb/QueuedContainersStatusPBImpl.java   |  80 +++++
 .../main/proto/yarn_server_common_protos.proto  |   6 +
 .../protocolrecords/TestProtocolRecords.java    |  30 ++
 .../nodemanager/NodeStatusUpdaterImpl.java      |  10 +
 .../monitor/ContainersMonitor.java              |   3 +
 .../monitor/ContainersMonitorImpl.java          |  12 +
 .../server/resourcemanager/ClusterMonitor.java  |  36 ++
 .../DistributedSchedulingService.java           | 162 ---------
 .../server/resourcemanager/NodeSelector.java    |  74 ++++
 .../server/resourcemanager/ResourceManager.java | 123 +------
 .../server/resourcemanager/rmnode/RMNode.java   |   4 +
 .../resourcemanager/rmnode/RMNodeImpl.java      |  29 +-
 .../rmnode/RMNodeStatusEvent.java               |   7 +
 .../DistributedSchedulingService.java           | 341 +++++++++++++++++++
 .../scheduler/distributed/TopKNodeSelector.java | 273 +++++++++++++++
 .../yarn/server/resourcemanager/MockNodes.java  |   6 +-
 .../yarn/server/resourcemanager/MockRM.java     |   2 +
 .../resourcemanager/TestApplicationCleanup.java |   7 +-
 .../TestDistributedSchedulingService.java       | 170 ---------
 .../resourcemanager/TestRMDispatcher.java       |   6 +-
 .../TestResourceTrackerService.java             |   4 +-
 .../TestAMRMRPCNodeUpdates.java                 |   6 +-
 .../applicationsmanager/TestAMRestart.java      |   6 +-
 .../TestDistributedSchedulingService.java       | 179 ++++++++++
 .../distributed/TestTopKNodeSelector.java       | 201 +++++++++++
 34 files changed, 1610 insertions(+), 457 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c725b97d/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index 92d586b..85096ba 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@@ -190,6 +191,10 @@ public class NodeInfo {
       return null;
     }
 
+    public QueuedContainersStatus getQueuedContainersStatus() {
+      return null;
+    }
+
     @Override
     public ResourceUtilization getAggregatedContainersUtilization() {
       return null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c725b97d/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index 2e9cccb..ab82e66 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@@ -179,6 +180,10 @@ public class RMNodeWrapper implements RMNode {
     return Collections.EMPTY_LIST;
   }
 
+  public QueuedContainersStatus getQueuedContainersStatus() {
+    return null;
+  }
+
   @Override
   public ResourceUtilization getAggregatedContainersUtilization() {
     return node.getAggregatedContainersUtilization();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c725b97d/hadoop-yarn-project/CHANGES-yarn-2877.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES-yarn-2877.txt b/hadoop-yarn-project/CHANGES-yarn-2877.txt
index a147866..e3b4c50 100644
--- a/hadoop-yarn-project/CHANGES-yarn-2877.txt
+++ b/hadoop-yarn-project/CHANGES-yarn-2877.txt
@@ -16,3 +16,5 @@ yarn-2877 distributed scheduling (Unreleased)
     YARN-2885. Create AMRMProxy request interceptor and ContainerAllocator
     to distribute OPPORTUNISTIC containers to appropriate Nodes (asuresh)
 
+    YARN-4412. Create ClusterMonitor to compute ordered list of preferred
+    NMs for OPPORTUNITIC containers (asuresh)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c725b97d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index a071841..f55e64c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -331,8 +331,21 @@ public class YarnConfiguration extends Configuration {
       YARN_PREFIX + "distributed-scheduling.incr-vcores";
   public static final int DIST_SCHEDULING_INCR_VCORES_DEFAULT = 1;
 
-  /** Container token expiry for container allocated via Distributed
-   * Scheduling. */
+  public static final String DIST_SCHEDULING_TOP_K =
+      YARN_PREFIX + "distributed-scheduling.top-k";
+  public static final int DIST_SCHEDULING_TOP_K_DEFAULT = 10;
+
+  public static final String DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS =
+      YARN_PREFIX + "distributed-scheduling.top-k-compute-interval-ms";
+  public static final long DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT = 1000;
+
+  public static final String DIST_SCHEDULING_TOP_K_COMPARATOR =
+      YARN_PREFIX + "distributed-scheduling.top-k-comparator";
+  public static final String DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT =
+      "WAIT_TIME";
+
+  /** Container token expiry for container allocated via
+   * Distributed Scheduling */
   public static String DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS =
       YARN_PREFIX + "distributed-scheduling.container-token-expiry";
   public static final int DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c725b97d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java
new file mode 100644
index 0000000..8a5ad92
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.event;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+
+/**
+ * This is a specialized EventHandler to be used by Services that are expected
+ * handle a large number of events efficiently by ensuring that the caller
+ * thread is not blocked. Events are immediately stored in a BlockingQueue and
+ * a separate dedicated Thread consumes events from the queue and handles
+ * appropriately
+ * @param <T> Type of Event
+ */
+public class EventDispatcher<T extends Event> extends
+    AbstractService implements EventHandler<T> {
+
+  private final EventHandler<T> handler;
+  private final BlockingQueue<T> eventQueue =
+      new LinkedBlockingDeque<>();
+  private final Thread eventProcessor;
+  private volatile boolean stopped = false;
+  private boolean shouldExitOnError = false;
+
+  private static final Log LOG = LogFactory.getLog(EventDispatcher.class);
+
+  private final class EventProcessor implements Runnable {
+    @Override
+    public void run() {
+
+      T event;
+
+      while (!stopped && !Thread.currentThread().isInterrupted()) {
+        try {
+          event = eventQueue.take();
+        } catch (InterruptedException e) {
+          LOG.error("Returning, interrupted : " + e);
+          return; // TODO: Kill RM.
+        }
+
+        try {
+          handler.handle(event);
+        } catch (Throwable t) {
+          // An error occurred, but we are shutting down anyway.
+          // If it was an InterruptedException, the very act of
+          // shutdown could have caused it and is probably harmless.
+          if (stopped) {
+            LOG.warn("Exception during shutdown: ", t);
+            break;
+          }
+          LOG.fatal("Error in handling event type " + event.getType()
+              + " to the Event Dispatcher", t);
+          if (shouldExitOnError
+              && !ShutdownHookManager.get().isShutdownInProgress()) {
+            LOG.info("Exiting, bbye..");
+            System.exit(-1);
+          }
+        }
+      }
+    }
+  }
+
+  public EventDispatcher(EventHandler<T> handler, String name) {
+    super(name);
+    this.handler = handler;
+    this.eventProcessor = new Thread(new EventProcessor());
+    this.eventProcessor.setName(getName() + ":Event Processor");
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    this.shouldExitOnError =
+        conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
+            Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    this.eventProcessor.start();
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    this.stopped = true;
+    this.eventProcessor.interrupt();
+    try {
+      this.eventProcessor.join();
+    } catch (InterruptedException e) {
+      throw new YarnRuntimeException(e);
+    }
+    super.serviceStop();
+  }
+
+  @Override
+  public void handle(T event) {
+    try {
+      int qSize = eventQueue.size();
+      if (qSize !=0 && qSize %1000 == 0) {
+        LOG.info("Size of " + getName() + " event-queue is " + qSize);
+      }
+      int remCapacity = eventQueue.remainingCapacity();
+      if (remCapacity < 1000) {
+        LOG.info("Very low remaining capacity on " + getName() + "" +
+            "event queue: " + remCapacity);
+      }
+      this.eventQueue.put(event);
+    } catch (InterruptedException e) {
+      LOG.info("Interrupted. Trying to exit gracefully.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c725b97d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 560d548..12c7a5b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2317,6 +2317,36 @@
     <value>4096</value>
   </property>
 
+  <!-- Distributed Scheduling configuration -->
+  <property>
+    <description>
+      The interval in milliseconds specifying the frequency at which the
+      Distributed Scheduling Cluster Monitor will recomute the top K
+      viable nodes on which OPPORTUNISTIC containers can be scheduled
+    </description>
+    <name>yarn.distributed-scheduling.top-k-compute-interval-ms</name>
+    <value>1000</value>
+  </property>
+
+  <property>
+    <description>
+      The Default comparator used by the Distributed Scheduling Cluster
+      Monitor to order the top K nodes on which OPPORTUNISTIC containers can
+      be scheduled. The allowed values are "WAIT_TIME" and "QUEUE_LENGTH"
+    </description>
+    <name>yarn.distributed-scheduling.top-k-comparator</name>
+    <value>WAIT_TIME</value>
+  </property>
+
+  <property>
+    <description>
+      The max number of nodes returned by the Distributed Scheduling Cluster
+      Monitor. (The value of K in top-K)
+    </description>
+    <name>yarn.distributed-scheduling.top-k</name>
+    <value>10</value>
+  </property>
+
   <!-- Node Labels Configuration -->
 
   <property>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c725b97d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
index 836cd4b..89e054b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
@@ -122,4 +122,13 @@ public abstract class NodeStatus {
   @Unstable
   public abstract void setIncreasedContainers(
       List<Container> increasedContainers);
+
+  @Private
+  @Unstable
+  public abstract QueuedContainersStatus getQueuedContainersStatus();
+
+  @Private
+  @Unstable
+  public abstract void setQueuedContainersStatus(
+      QueuedContainersStatus queuedContainersStatus);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c725b97d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java
new file mode 100644
index 0000000..a7f0ece
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * <p>
+ * <code>QueuedContainersStatus</code> captures information pertaining to the
+ * state of execution of the Queueable containers within a node.
+ * </p>
+ */
+@Private
+@Evolving
+public abstract class QueuedContainersStatus {
+  public static QueuedContainersStatus newInstance() {
+    return Records.newRecord(QueuedContainersStatus.class);
+  }
+
+  public abstract int getEstimatedQueueWaitTime();
+
+  public abstract void setEstimatedQueueWaitTime(int queueWaitTime);
+
+  public abstract int getWaitQueueLength();
+
+  public abstract void setWaitQueueLength(int queueWaitTime);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c725b97d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
index 8dd4832..9a9a83a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
@@ -33,14 +33,17 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourceUtilizationPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder;
-import org.apache.hadoop.yarn.proto.YarnProtos.ResourceUtilizationProto;
+
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 
@@ -400,6 +403,27 @@ public class NodeStatusPBImpl extends NodeStatus {
     this.increasedContainers = increasedContainers;
   }
 
+  @Override
+  public QueuedContainersStatus getQueuedContainersStatus() {
+    NodeStatusProtoOrBuilder p =
+        this.viaProto ? this.proto : this.builder;
+    if (!p.hasQueuedContainerStatus()) {
+      return null;
+    }
+    return convertFromProtoFormat(p.getQueuedContainerStatus());
+  }
+
+  @Override
+  public void setQueuedContainersStatus(QueuedContainersStatus queuedContainersStatus) {
+    maybeInitBuilder();
+    if (queuedContainersStatus == null) {
+      this.builder.clearQueuedContainerStatus();
+      return;
+    }
+    this.builder.setQueuedContainerStatus(
+        convertToProtoFormat(queuedContainersStatus));
+  }
+
   private NodeIdProto convertToProtoFormat(NodeId nodeId) {
     return ((NodeIdPBImpl)nodeId).getProto();
   }
@@ -433,15 +457,25 @@ public class NodeStatusPBImpl extends NodeStatus {
     return ((ApplicationIdPBImpl)c).getProto();
   }
 
-  private ResourceUtilizationProto convertToProtoFormat(ResourceUtilization r) {
+  private YarnProtos.ResourceUtilizationProto convertToProtoFormat(ResourceUtilization r) {
     return ((ResourceUtilizationPBImpl) r).getProto();
   }
 
   private ResourceUtilizationPBImpl convertFromProtoFormat(
-      ResourceUtilizationProto p) {
+      YarnProtos.ResourceUtilizationProto p) {
     return new ResourceUtilizationPBImpl(p);
   }
 
+  private YarnServerCommonProtos.QueuedContainersStatusProto convertToProtoFormat(
+      QueuedContainersStatus r) {
+    return ((QueuedContainersStatusPBImpl) r).getProto();
+  }
+
+  private QueuedContainersStatus convertFromProtoFormat(
+      YarnServerCommonProtos.QueuedContainersStatusProto p) {
+    return new QueuedContainersStatusPBImpl(p);
+  }
+
   private ContainerPBImpl convertFromProtoFormat(
       ContainerProto c) {
     return new ContainerPBImpl(c);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c725b97d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/QueuedContainersStatusPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/QueuedContainersStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/QueuedContainersStatusPBImpl.java
new file mode 100644
index 0000000..54470f4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/QueuedContainersStatusPBImpl.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records.impl.pb;
+
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
+
+public class QueuedContainersStatusPBImpl extends QueuedContainersStatus {
+
+  private YarnServerCommonProtos.QueuedContainersStatusProto proto =
+      YarnServerCommonProtos.QueuedContainersStatusProto.getDefaultInstance();
+  private YarnServerCommonProtos.QueuedContainersStatusProto.Builder builder = null;
+  private boolean viaProto = false;
+
+  public QueuedContainersStatusPBImpl() {
+    builder = YarnServerCommonProtos.QueuedContainersStatusProto.newBuilder();
+  }
+
+  public QueuedContainersStatusPBImpl(YarnServerCommonProtos
+      .QueuedContainersStatusProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public YarnServerCommonProtos.QueuedContainersStatusProto getProto() {
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder =
+          YarnServerCommonProtos.QueuedContainersStatusProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  @Override
+  public int getEstimatedQueueWaitTime() {
+    YarnServerCommonProtos.QueuedContainersStatusProtoOrBuilder p =
+        viaProto ? proto : builder;
+    return p.getEstimatedQueueWaitTime();
+  }
+
+  @Override
+  public void setEstimatedQueueWaitTime(int queueWaitTime) {
+    maybeInitBuilder();
+    builder.setEstimatedQueueWaitTime(queueWaitTime);
+  }
+
+  @Override
+  public int getWaitQueueLength() {
+    YarnServerCommonProtos.QueuedContainersStatusProtoOrBuilder p =
+        viaProto ? proto : builder;
+    return p.getWaitQueueLength();
+  }
+
+  @Override
+  public void setWaitQueueLength(int waitQueueLength) {
+    maybeInitBuilder();
+    builder.setWaitQueueLength(waitQueueLength);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c725b97d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
index 77064a0..c23d557 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
@@ -39,6 +39,12 @@ message NodeStatusProto {
   optional ResourceUtilizationProto containers_utilization = 6;
   optional ResourceUtilizationProto node_utilization = 7;
   repeated ContainerProto increased_containers = 8;
+  optional QueuedContainersStatusProto queued_container_status = 9;
+}
+
+message QueuedContainersStatusProto {
+  optional int32 estimated_queue_wait_time = 1;
+  optional int32 wait_queue_length = 2;
 }
 
 message MasterKeyProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c725b97d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
index 86e49f0..27bdfff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
@@ -39,8 +39,14 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NMContainerStatusPBImpl;
+
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
+    .NodeHeartbeatRequestPBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
+
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
 import org.junit.Test;
@@ -131,4 +137,28 @@ public class TestProtocolRecords {
           ((NodeHeartbeatResponsePBImpl) record).getProto());
     Assert.assertEquals(appCredentials, proto.getSystemCredentialsForApps());
   }
+
+  @Test
+  public void testNodeHeartBeatRequest() throws IOException {
+    NodeHeartbeatRequest record =
+        Records.newRecord(NodeHeartbeatRequest.class);
+    NodeStatus nodeStatus =
+        Records.newRecord(NodeStatus.class);
+    QueuedContainersStatus queuedContainersStatus = Records.newRecord
+        (QueuedContainersStatus.class);
+    queuedContainersStatus.setEstimatedQueueWaitTime(123);
+    queuedContainersStatus.setWaitQueueLength(321);
+    nodeStatus.setQueuedContainersStatus(queuedContainersStatus);
+    record.setNodeStatus(nodeStatus);
+
+    NodeHeartbeatRequestPBImpl pb = new
+        NodeHeartbeatRequestPBImpl(
+        ((NodeHeartbeatRequestPBImpl) record).getProto());
+
+    Assert.assertEquals(123,
+        pb.getNodeStatus()
+            .getQueuedContainersStatus().getEstimatedQueueWaitTime());
+    Assert.assertEquals(321,
+        pb.getNodeStatus().getQueuedContainersStatus().getWaitQueueLength());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c725b97d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 72769bf..0b8bf4ac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -71,6 +71,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
+
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
@@ -449,9 +451,17 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
           createKeepAliveApplicationList(), nodeHealthStatus,
           containersUtilization, nodeUtilization, increasedContainers);
 
+    nodeStatus.setQueuedContainersStatus(getContainerQueueInfo());
     return nodeStatus;
   }
 
+  private QueuedContainersStatus getContainerQueueInfo() {
+    ContainerManagerImpl containerManager =
+        (ContainerManagerImpl) this.context.getContainerManager();
+    ContainersMonitor containersMonitor =
+        containerManager.getContainersMonitor();
+    return containersMonitor.getQueuedContainersStatus();
+  }
   /**
    * Get the aggregated utilization of the containers in this node.
    * @return Resource utilization of all the containers.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c725b97d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
index 1069b4f..fffcd66 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
 
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo;
@@ -44,4 +45,6 @@ public interface ContainersMonitor extends Service,
 
   void subtractNodeResourcesFromResourceUtilization(
       ResourceUtilization resourceUtil);
+
+  QueuedContainersStatus getQueuedContainersStatus();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c725b97d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index 0feac3b..37d610d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
@@ -87,6 +88,7 @@ public class ContainersMonitorImpl extends AbstractService implements
   private ResourceUtilization containersAllocation;
 
   private volatile boolean stopped = false;
+  private QueuedContainersStatus queuedContainersStatus;
 
   public ContainersMonitorImpl(ContainerExecutor exec,
       AsyncDispatcher dispatcher, Context context) {
@@ -100,6 +102,7 @@ public class ContainersMonitorImpl extends AbstractService implements
 
     this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f);
     this.containersAllocation = ResourceUtilization.newInstance(0, 0, 0.0f);
+    this.queuedContainersStatus = QueuedContainersStatus.newInstance();
   }
 
   @Override
@@ -777,6 +780,15 @@ public class ContainersMonitorImpl extends AbstractService implements
         maxVCoresAllottedForContainers / nodeCpuPercentageForYARN) / 1000.0f;
   }
 
+  public QueuedContainersStatus getQueuedContainersStatus() {
+    return this.queuedContainersStatus;
+  }
+
+  public void setQueuedContainersStatus(QueuedContainersStatus
+      queuedContainersStatus) {
+    this.queuedContainersStatus = queuedContainersStatus;
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public void handle(ContainersMonitorEvent monitoringEvent) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c725b97d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java
new file mode 100644
index 0000000..4fd62d0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.yarn.api.records.ResourceOption;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+
+import java.util.List;
+
+public interface ClusterMonitor {
+
+  void addNode(List<NMContainerStatus> containerStatuses, RMNode rmNode);
+
+  void removeNode(RMNode removedRMNode);
+
+  void nodeUpdate(RMNode rmNode);
+
+  void updateNodeResource(RMNode rmNode, ResourceOption resourceOption);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c725b97d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
deleted file mode 100644
index 5210f7f..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
-import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
-
-
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.security
-    .AMRMTokenSecretManager;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-
-public class DistributedSchedulingService extends ApplicationMasterService
-    implements DistributedSchedulerProtocol {
-
-  public DistributedSchedulingService(RMContext rmContext,
-      YarnScheduler scheduler) {
-    super(DistributedSchedulingService.class.getName(), rmContext, scheduler);
-  }
-
-  @Override
-  public Server getServer(YarnRPC rpc, Configuration serverConf,
-      InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
-    Server server = rpc.getServer(DistributedSchedulerProtocol.class, this,
-        addr, serverConf, secretManager,
-        serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
-    // To support application running no NMs that DO NOT support
-    // Dist Scheduling...
-    ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
-        ApplicationMasterProtocolPB.class,
-        ApplicationMasterProtocolService.newReflectiveBlockingService(
-            new ApplicationMasterProtocolPBServiceImpl(this)));
-    return server;
-  }
-
-  @Override
-  public RegisterApplicationMasterResponse registerApplicationMaster
-      (RegisterApplicationMasterRequest request) throws YarnException,
-      IOException {
-    return super.registerApplicationMaster(request);
-  }
-
-  @Override
-  public FinishApplicationMasterResponse finishApplicationMaster
-      (FinishApplicationMasterRequest request) throws YarnException,
-      IOException {
-    return super.finishApplicationMaster(request);
-  }
-
-  @Override
-  public AllocateResponse allocate(AllocateRequest request) throws
-      YarnException, IOException {
-    return super.allocate(request);
-  }
-
-  @Override
-  public DistSchedRegisterResponse
-  registerApplicationMasterForDistributedScheduling(
-      RegisterApplicationMasterRequest request) throws YarnException,
-      IOException {
-    RegisterApplicationMasterResponse response =
-        registerApplicationMaster(request);
-    DistSchedRegisterResponse dsResp = recordFactory
-        .newRecordInstance(DistSchedRegisterResponse.class);
-    dsResp.setRegisterResponse(response);
-    dsResp.setMinAllocatableCapabilty(
-        Resource.newInstance(
-            getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY,
-                YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY_DEFAULT),
-            getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_MIN_VCORES,
-                YarnConfiguration.DIST_SCHEDULING_MIN_VCORES_DEFAULT)
-        )
-    );
-    dsResp.setMaxAllocatableCapabilty(
-        Resource.newInstance(
-            getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY,
-                YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_DEFAULT),
-            getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_MAX_VCORES,
-                YarnConfiguration.DIST_SCHEDULING_MAX_VCORES_DEFAULT)
-        )
-    );
-    dsResp.setIncrAllocatableCapabilty(
-        Resource.newInstance(
-            getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY,
-                YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY_DEFAULT),
-            getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_INCR_VCORES,
-                YarnConfiguration.DIST_SCHEDULING_INCR_VCORES_DEFAULT)
-        )
-    );
-    dsResp.setContainerTokenExpiryInterval(
-        getConfig().getInt(
-            YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS,
-            YarnConfiguration.
-                DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT));
-    dsResp.setContainerIdStart(
-        this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);
-
-    // Set nodes to be used for scheduling
-    // TODO: The actual computation of the list will happen in YARN-4412
-    // TODO: Till then, send the complete list
-    dsResp.setNodesForScheduling(
-        new ArrayList<>(this.rmContext.getRMNodes().keySet()));
-    return dsResp;
-  }
-
-  @Override
-  public DistSchedAllocateResponse allocateForDistributedScheduling
-      (AllocateRequest request) throws YarnException, IOException {
-    AllocateResponse response = allocate(request);
-    DistSchedAllocateResponse dsResp = recordFactory.newRecordInstance
-        (DistSchedAllocateResponse.class);
-    dsResp.setAllocateResponse(response);
-    dsResp.setNodesForScheduling(
-        new ArrayList<>(this.rmContext.getRMNodes().keySet()));
-    return dsResp;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c725b97d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodeSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodeSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodeSelector.java
new file mode 100644
index 0000000..d6a031c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodeSelector.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Simple Node selector interface contractually obligating the implementor to
+ * provide the caller with an ordered list of nodes. It also provides
+ * convenience methods to specify criterion
+ */
+public interface NodeSelector {
+
+  /**
+   * SelectionHint allows callers to provide additional suggestions to be
+   * used for selection
+   */
+  class SelectionHint {
+
+    private final NodeId[] nodeIds;
+
+    // minimum number of nodes to include from the Hint in the returned list
+    private final int minToInclude;
+
+    public SelectionHint(Collection<NodeId> nodeIds,
+        int minNodesToInclude) {
+      this.nodeIds = nodeIds.toArray(new NodeId[0]);
+      this.minToInclude = minNodesToInclude;
+    }
+
+    public NodeId[] getNodeIds() {
+      return nodeIds;
+    }
+
+    public int getMinToInclude() {
+      return minToInclude;
+    }
+
+  }
+
+  /**
+   * Select an ordered list of Nodes based on the Implementation
+   * @return Ordered list of Nodes
+   */
+  List<NodeId> selectNodes();
+
+  /**
+   * Select an ordered list of Nodes based on the Implementation. Also
+   * allows callers to specify some hints in terms of specific node or
+   * list of nodes (as well as a how many from the list is needed)
+   * @return Ordered list of Nodes
+   */
+  List<NodeId> selectNodes(Collection<SelectionHint> hints);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c725b97d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 2fc940b..1fe3e83 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
 import org.apache.hadoop.security.authorize.ProxyUsers;
-import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.util.ExitUtil;
@@ -59,6 +58,7 @@ import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
@@ -90,6 +90,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.DistributedSchedulingService;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@@ -118,8 +119,6 @@ import java.security.PrivilegedExceptionAction;
 import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * The ResourceManager is the main class that is a set of components.
@@ -370,7 +369,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
   }
 
   protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
-    return new SchedulerEventDispatcher(this.scheduler);
+    return new EventDispatcher(this.scheduler, "SchedulerEventDispatcher");
   }
 
   protected Dispatcher createDispatcher() {
@@ -726,108 +725,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
   }
 
   @Private
-  public static class SchedulerEventDispatcher extends AbstractService
-      implements EventHandler<SchedulerEvent> {
-
-    private final ResourceScheduler scheduler;
-    private final BlockingQueue<SchedulerEvent> eventQueue =
-      new LinkedBlockingQueue<SchedulerEvent>();
-    private volatile int lastEventQueueSizeLogged = 0;
-    private final Thread eventProcessor;
-    private volatile boolean stopped = false;
-    private boolean shouldExitOnError = false;
-
-    public SchedulerEventDispatcher(ResourceScheduler scheduler) {
-      super(SchedulerEventDispatcher.class.getName());
-      this.scheduler = scheduler;
-      this.eventProcessor = new Thread(new EventProcessor());
-      this.eventProcessor.setName("ResourceManager Event Processor");
-    }
-
-    @Override
-    protected void serviceInit(Configuration conf) throws Exception {
-      this.shouldExitOnError =
-          conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
-            Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
-      super.serviceInit(conf);
-    }
-
-    @Override
-    protected void serviceStart() throws Exception {
-      this.eventProcessor.start();
-      super.serviceStart();
-    }
-
-    private final class EventProcessor implements Runnable {
-      @Override
-      public void run() {
-
-        SchedulerEvent event;
-
-        while (!stopped && !Thread.currentThread().isInterrupted()) {
-          try {
-            event = eventQueue.take();
-          } catch (InterruptedException e) {
-            LOG.error("Returning, interrupted : " + e);
-            return; // TODO: Kill RM.
-          }
-
-          try {
-            scheduler.handle(event);
-          } catch (Throwable t) {
-            // An error occurred, but we are shutting down anyway.
-            // If it was an InterruptedException, the very act of 
-            // shutdown could have caused it and is probably harmless.
-            if (stopped) {
-              LOG.warn("Exception during shutdown: ", t);
-              break;
-            }
-            LOG.fatal("Error in handling event type " + event.getType()
-                + " to the scheduler", t);
-            if (shouldExitOnError
-                && !ShutdownHookManager.get().isShutdownInProgress()) {
-              LOG.info("Exiting, bbye..");
-              System.exit(-1);
-            }
-          }
-        }
-      }
-    }
-
-    @Override
-    protected void serviceStop() throws Exception {
-      this.stopped = true;
-      this.eventProcessor.interrupt();
-      try {
-        this.eventProcessor.join();
-      } catch (InterruptedException e) {
-        throw new YarnRuntimeException(e);
-      }
-      super.serviceStop();
-    }
-
-    @Override
-    public void handle(SchedulerEvent event) {
-      try {
-        int qSize = eventQueue.size();
-        if (qSize != 0 && qSize % 1000 == 0
-            && lastEventQueueSizeLogged != qSize) {
-          lastEventQueueSizeLogged = qSize;
-          LOG.info("Size of scheduler event-queue is " + qSize);
-        }
-        int remCapacity = eventQueue.remainingCapacity();
-        if (remCapacity < 1000) {
-          LOG.info("Very low remaining capacity on scheduler event queue: "
-              + remCapacity);
-        }
-        this.eventQueue.put(event);
-      } catch (InterruptedException e) {
-        LOG.info("Interrupted. Trying to exit gracefully.");
-      }
-    }
-  }
-
-  @Private
   public static class RMFatalEventDispatcher
       implements EventHandler<RMFatalEvent> {
 
@@ -1234,7 +1131,19 @@ public class ResourceManager extends CompositeService implements Recoverable {
     if (this.rmContext.getYarnConfiguration().getBoolean(
         YarnConfiguration.DIST_SCHEDULING_ENABLED,
         YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
-      return new DistributedSchedulingService(this.rmContext, scheduler);
+      DistributedSchedulingService distributedSchedulingService = new
+          DistributedSchedulingService(this.rmContext, scheduler);
+      EventDispatcher distSchedulerEventDispatcher =
+          new EventDispatcher(distributedSchedulingService,
+              DistributedSchedulingService.class.getName());
+      // Add an event dispoatcher for the DistributedSchedulingService
+      // to handle node updates/additions and removals.
+      // Since the SchedulerEvent is currently a super set of theses,
+      // we register interest for it..
+      addService(distSchedulerEventDispatcher);
+      rmDispatcher.register(SchedulerEventType.class,
+          distSchedulerEventDispatcher);
+      return distributedSchedulingService;
     }
     return new ApplicationMasterService(this.rmContext, scheduler);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c725b97d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index d8df9f1..3bf9538 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 
 /**
  * Node managers information on available resources 
@@ -168,4 +169,7 @@ public interface RMNode {
       NodeHeartbeatResponse response);
   
   public List<Container> pullNewlyIncreasedContainers();
+
+  public QueuedContainersStatus getQueuedContainersStatus();
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c725b97d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 9b80716..3179169 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
@@ -125,6 +126,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   /* Resource utilization for the node. */
   private ResourceUtilization nodeUtilization;
 
+  /* Container Queue Information for the node.. Used by Distributed Scheduler */
+  private QueuedContainersStatus queuedContainersStatus;
+
   private final ContainerAllocationExpirer containerAllocationExpirer;
   /* set of containers that have just launched */
   private final Set<ContainerId> launchedContainers =
@@ -1121,7 +1125,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
 
       RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
-
+      rmNode.setQueuedContainersStatus(statusEvent.getContainerQueueInfo());
       NodeHealthStatus remoteNodeHealthStatus = updateRMNodeFromStatusEvents(
           rmNode, statusEvent);
       NodeState initialState = rmNode.getState();
@@ -1383,4 +1387,25 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   public Resource getOriginalTotalCapability() {
     return this.originalTotalCapability;
   }
- }
+
+  public QueuedContainersStatus getQueuedContainersStatus() {
+    this.readLock.lock();
+
+    try {
+      return this.queuedContainersStatus;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  public void setQueuedContainersStatus(QueuedContainersStatus
+      queuedContainersStatus) {
+    this.writeLock.lock();
+
+    try {
+      this.queuedContainersStatus = queuedContainersStatus;
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c725b97d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
index ba6ac9b..5eeaabe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 
@@ -79,6 +80,10 @@ public class RMNodeStatusEvent extends RMNodeEvent {
     return this.logAggregationReportsForApps;
   }
 
+  public QueuedContainersStatus getContainerQueueInfo() {
+    return this.nodeStatus.getQueuedContainersStatus();
+  }
+
   public void setLogAggregationReportsForApps(
       List<LogAggregationReport> logAggregationReportsForApps) {
     this.logAggregationReportsForApps = logAggregationReportsForApps;
@@ -89,4 +94,6 @@ public class RMNodeStatusEvent extends RMNodeEvent {
     return this.nodeStatus.getIncreasedContainers() == null ?
         Collections.EMPTY_LIST : this.nodeStatus.getIncreasedContainers();
   }
+
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c725b97d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/DistributedSchedulingService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/DistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/DistributedSchedulingService.java
new file mode 100644
index 0000000..f0235f7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/DistributedSchedulingService.java
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
+
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
+
+import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
+import org.apache.hadoop.yarn.server.resourcemanager.ClusterMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.NodeSelector;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
+    .NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
+    .NodeRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
+    .NodeResourceUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
+    .NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
+    .SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.security
+    .AMRMTokenSecretManager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The DistributedSchedulingService is started instead of the
+ * ApplicationMasterService if DistributedScheduling is enabled for the YARN
+ * cluster.
+ * It extends the functionality of the ApplicationMasterService by servicing
+ * clients (AMs and AMRMProxy request interceptors) that understand the
+ * DistributedSchedulingProtocol.
+ */
+public class DistributedSchedulingService extends ApplicationMasterService
+    implements DistributedSchedulerProtocol, EventHandler<SchedulerEvent> {
+
+  private static final Log LOG =
+      LogFactory.getLog(DistributedSchedulingService.class);
+
+  private final ClusterMonitor clusterMonitor;
+  private final NodeSelector nodeSelector;
+
+  private final ConcurrentHashMap<String, Set<NodeId>> rackToNode =
+      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, Set<NodeId>> hostToNode =
+      new ConcurrentHashMap<>();
+
+  public DistributedSchedulingService(RMContext rmContext,
+      YarnScheduler scheduler) {
+    super(DistributedSchedulingService.class.getName(), rmContext, scheduler);
+    int k = rmContext.getYarnConfiguration().getInt(
+        YarnConfiguration.DIST_SCHEDULING_TOP_K,
+        YarnConfiguration.DIST_SCHEDULING_TOP_K_DEFAULT);
+    long topKComputationInterval = rmContext.getYarnConfiguration().getLong(
+        YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS,
+        YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT);
+    TopKNodeSelector.TopKComparator comparator =
+        TopKNodeSelector.TopKComparator.valueOf(
+            rmContext.getYarnConfiguration().get(
+                YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR,
+                YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT));
+    TopKNodeSelector topKSelector =
+        new TopKNodeSelector(k, topKComputationInterval, comparator);
+    this.clusterMonitor = topKSelector;
+    this.nodeSelector = topKSelector;
+  }
+
+  @Override
+  public Server getServer(YarnRPC rpc, Configuration serverConf,
+      InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
+    Server server = rpc.getServer(DistributedSchedulerProtocol.class, this,
+        addr, serverConf, secretManager,
+        serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
+    // To support application running on NMs that DO NOT support
+    // Dist Scheduling... The server multiplexes both the
+    // ApplicationMasterProtocol as well as the DistributedSchedulingProtocol
+    ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+        ApplicationMasterProtocolPB.class,
+        ApplicationMasterProtocolService.newReflectiveBlockingService(
+            new ApplicationMasterProtocolPBServiceImpl(this)));
+    return server;
+  }
+
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster
+      (RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return super.registerApplicationMaster(request);
+  }
+
+  @Override
+  public FinishApplicationMasterResponse finishApplicationMaster
+      (FinishApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return super.finishApplicationMaster(request);
+  }
+
+  @Override
+  public AllocateResponse allocate(AllocateRequest request) throws
+      YarnException, IOException {
+    return super.allocate(request);
+  }
+
+  @Override
+  public DistSchedRegisterResponse
+  registerApplicationMasterForDistributedScheduling(
+      RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    RegisterApplicationMasterResponse response =
+        registerApplicationMaster(request);
+    DistSchedRegisterResponse dsResp = recordFactory
+        .newRecordInstance(DistSchedRegisterResponse.class);
+    dsResp.setRegisterResponse(response);
+    dsResp.setMinAllocatableCapabilty(
+        Resource.newInstance(
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY,
+                YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY_DEFAULT),
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_MIN_VCORES,
+                YarnConfiguration.DIST_SCHEDULING_MIN_VCORES_DEFAULT)
+        )
+    );
+    dsResp.setMaxAllocatableCapabilty(
+        Resource.newInstance(
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY,
+                YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_DEFAULT),
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_MAX_VCORES,
+                YarnConfiguration.DIST_SCHEDULING_MAX_VCORES_DEFAULT)
+        )
+    );
+    dsResp.setIncrAllocatableCapabilty(
+        Resource.newInstance(
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY,
+                YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY_DEFAULT),
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_INCR_VCORES,
+                YarnConfiguration.DIST_SCHEDULING_INCR_VCORES_DEFAULT)
+        )
+    );
+    dsResp.setContainerTokenExpiryInterval(
+        getConfig().getInt(
+            YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS,
+            YarnConfiguration.
+                DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT));
+    dsResp.setContainerIdStart(
+        this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);
+
+    // Set nodes to be used for scheduling
+    dsResp.setNodesForScheduling(
+        new ArrayList<>(this.nodeSelector.selectNodes()));
+    return dsResp;
+  }
+
+  @Override
+  public DistSchedAllocateResponse allocateForDistributedScheduling
+      (AllocateRequest request) throws YarnException, IOException {
+    AllocateResponse response = allocate(request);
+    DistSchedAllocateResponse dsResp = recordFactory.newRecordInstance
+        (DistSchedAllocateResponse.class);
+    dsResp.setAllocateResponse(response);
+    dsResp.setNodesForScheduling(
+        new ArrayList<>(
+            this.nodeSelector.selectNodes(createSelectionHints(request))));
+    return dsResp;
+  }
+
+  /**
+   * Create a selection criteria. Essentially, it checks for requests that has
+   * relax locality set to false and is a Rack local or Node local request.
+   * If so, it will add all Nodes in that rack / node (If multiple NMs are
+   * running on the node)to the selection criteria.  It will also set the
+   * min number of nodes required (1 in case of node local reqs) to be equal
+   * to the number of containers required.. so that the local RM can spread the
+   * containers across the returned nodes.
+   * @param request
+   * @return Collection of SelectionHint
+   */
+  private Collection<NodeSelector.SelectionHint> createSelectionHints(
+      AllocateRequest request) {
+    List<NodeSelector.SelectionHint> retList = new ArrayList<>();
+    // TODO: Add support for node labels (support obtaining a list of nodes
+    //       given a label expression)
+    for (ResourceRequest rr : request.getAskList()) {
+      if (!rr.getRelaxLocality()
+          && rackToNode.containsKey(rr.getResourceName())) {
+        retList.add(new NodeSelector.SelectionHint(
+            rackToNode.get(rr.getResourceName()), rr.getNumContainers()));
+      }
+      if (!rr.getRelaxLocality()
+          && hostToNode.containsKey(rr.getResourceName())) {
+        retList.add(new NodeSelector.SelectionHint(
+            hostToNode.get(rr.getResourceName()), 1));
+      }
+    }
+    return retList;
+  }
+
+  private void addToMapping(ConcurrentHashMap<String, Set<NodeId>> mapping,
+      String rackName, NodeId nodeId) {
+    if (rackName != null) {
+      mapping.putIfAbsent(rackName, new HashSet<NodeId>());
+      Set<NodeId> nodeIds = mapping.get(rackName);
+      synchronized (nodeIds) {
+        nodeIds.add(nodeId);
+      }
+    }
+  }
+
+  private void removeFromMapping(ConcurrentHashMap<String, Set<NodeId>> mapping,
+      String rackName, NodeId nodeId) {
+    if (rackName != null) {
+      Set<NodeId> nodeIds = mapping.get(rackName);
+      synchronized (nodeIds) {
+        nodeIds.remove(nodeId);
+      }
+    }
+  }
+
+  @Override
+  public void handle(SchedulerEvent event) {
+    switch (event.getType()) {
+      case NODE_ADDED:
+        if (!(event instanceof NodeAddedSchedulerEvent)) {
+          throw new RuntimeException("Unexpected event type: " + event);
+        }
+        NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
+        clusterMonitor.addNode(nodeAddedEvent.getContainerReports(),
+            nodeAddedEvent.getAddedRMNode());
+        addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(),
+            nodeAddedEvent.getAddedRMNode().getNodeID());
+        addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(),
+            nodeAddedEvent.getAddedRMNode().getNodeID());
+        break;
+      case NODE_REMOVED:
+        if (!(event instanceof NodeRemovedSchedulerEvent)) {
+          throw new RuntimeException("Unexpected event type: " + event);
+        }
+        NodeRemovedSchedulerEvent nodeRemovedEvent =
+            (NodeRemovedSchedulerEvent)event;
+        clusterMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode());
+        removeFromMapping(rackToNode,
+            nodeRemovedEvent.getRemovedRMNode().getRackName(),
+            nodeRemovedEvent.getRemovedRMNode().getNodeID());
+        removeFromMapping(hostToNode,
+            nodeRemovedEvent.getRemovedRMNode().getHostName(),
+            nodeRemovedEvent.getRemovedRMNode().getNodeID());
+        break;
+      case NODE_UPDATE:
+        if (!(event instanceof NodeUpdateSchedulerEvent)) {
+          throw new RuntimeException("Unexpected event type: " + event);
+        }
+        NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
+        clusterMonitor.nodeUpdate(nodeUpdatedEvent.getRMNode());
+        break;
+      case NODE_RESOURCE_UPDATE:
+        if (!(event instanceof NodeResourceUpdateSchedulerEvent)) {
+          throw new RuntimeException("Unexpected event type: " + event);
+        }
+        NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent =
+            (NodeResourceUpdateSchedulerEvent)event;
+        clusterMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
+            nodeResourceUpdatedEvent.getResourceOption());
+        break;
+
+      // <-- IGNORED EVENTS : START -->
+      case APP_ADDED:
+        break;
+      case APP_REMOVED:
+        break;
+      case APP_ATTEMPT_ADDED:
+        break;
+      case APP_ATTEMPT_REMOVED:
+        break;
+      case CONTAINER_EXPIRED:
+        break;
+      case NODE_LABELS_UPDATE:
+        break;
+      // <-- IGNORED EVENTS : END -->
+      default:
+        LOG.error("Unknown event arrived at DistributedSchedulingService: "
+            + event.toString());
+    }
+  }
+
+}


Mime
View raw message