hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wan...@apache.org
Subject [19/50] [abbrv] hadoop git commit: YARN-5457. Refactor DistributedScheduling framework to pull out common functionality. (asuresh)
Date Tue, 09 Aug 2016 22:34:01 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9e061/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java
deleted file mode 100644
index 843ac09..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java
+++ /dev/null
@@ -1,361 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
-import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
-
-
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
-
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor;
-
-
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * The DistributedSchedulingAMService is started instead of the
- * ApplicationMasterService if distributed scheduling is enabled for the YARN
- * cluster.
- * It extends the functionality of the ApplicationMasterService by servicing
- * clients (AMs and AMRMProxy request interceptors) that understand the
- * DistributedSchedulingProtocol.
- */
-public class DistributedSchedulingAMService extends ApplicationMasterService
-    implements DistributedSchedulingAMProtocol, EventHandler<SchedulerEvent> {
-
-  private static final Log LOG =
-      LogFactory.getLog(DistributedSchedulingAMService.class);
-
-  private final NodeQueueLoadMonitor nodeMonitor;
-
-  private final ConcurrentHashMap<String, Set<NodeId>> rackToNode =
-      new ConcurrentHashMap<>();
-  private final ConcurrentHashMap<String, Set<NodeId>> hostToNode =
-      new ConcurrentHashMap<>();
-  private final int k;
-
-  public DistributedSchedulingAMService(RMContext rmContext,
-                                      YarnScheduler scheduler) {
-    super(DistributedSchedulingAMService.class.getName(), rmContext, scheduler);
-    this.k = rmContext.getYarnConfiguration().getInt(
-        YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED,
-        YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED_DEFAULT);
-    long nodeSortInterval = rmContext.getYarnConfiguration().getLong(
-        YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS,
-        YarnConfiguration.
-            NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT);
-    NodeQueueLoadMonitor.LoadComparator comparator =
-        NodeQueueLoadMonitor.LoadComparator.valueOf(
-            rmContext.getYarnConfiguration().get(
-                YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR,
-                YarnConfiguration.
-                    NM_CONTAINER_QUEUING_LOAD_COMPARATOR_DEFAULT));
-
-    NodeQueueLoadMonitor topKSelector =
-        new NodeQueueLoadMonitor(nodeSortInterval, comparator);
-
-    float sigma = rmContext.getYarnConfiguration()
-        .getFloat(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV,
-            YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV_DEFAULT);
-
-    int limitMin, limitMax;
-
-    if (comparator == NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH) {
-      limitMin = rmContext.getYarnConfiguration()
-          .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH,
-              YarnConfiguration.
-                  NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH_DEFAULT);
-      limitMax = rmContext.getYarnConfiguration()
-          .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH,
-              YarnConfiguration.
-                  NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT);
-    } else {
-      limitMin = rmContext.getYarnConfiguration()
-          .getInt(
-              YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS,
-              YarnConfiguration.
-                  NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS_DEFAULT);
-      limitMax = rmContext.getYarnConfiguration()
-          .getInt(
-              YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS,
-              YarnConfiguration.
-                  NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS_DEFAULT);
-    }
-
-    topKSelector.initThresholdCalculator(sigma, limitMin, limitMax);
-    this.nodeMonitor = topKSelector;
-  }
-
-  @Override
-  public Server getServer(YarnRPC rpc, Configuration serverConf,
-      InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
-    Server server = rpc.getServer(DistributedSchedulingAMProtocol.class, this,
-        addr, serverConf, secretManager,
-        serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
-    // To support application running on NMs that DO NOT support
-    // Dist Scheduling... The server multiplexes both the
-    // ApplicationMasterProtocol as well as the DistributedSchedulingProtocol
-    ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
-        ApplicationMasterProtocolPB.class,
-        ApplicationMasterProtocolService.newReflectiveBlockingService(
-            new ApplicationMasterProtocolPBServiceImpl(this)));
-    return server;
-  }
-
-  @Override
-  public RegisterApplicationMasterResponse registerApplicationMaster
-      (RegisterApplicationMasterRequest request) throws YarnException,
-      IOException {
-    return super.registerApplicationMaster(request);
-  }
-
-  @Override
-  public FinishApplicationMasterResponse finishApplicationMaster
-      (FinishApplicationMasterRequest request) throws YarnException,
-      IOException {
-    return super.finishApplicationMaster(request);
-  }
-
-  @Override
-  public AllocateResponse allocate(AllocateRequest request) throws
-      YarnException, IOException {
-    return super.allocate(request);
-  }
-
-  @Override
-  public RegisterDistributedSchedulingAMResponse
-  registerApplicationMasterForDistributedScheduling(
-      RegisterApplicationMasterRequest request) throws YarnException,
-      IOException {
-    RegisterApplicationMasterResponse response =
-        registerApplicationMaster(request);
-    RegisterDistributedSchedulingAMResponse dsResp = recordFactory
-        .newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
-    dsResp.setRegisterResponse(response);
-    dsResp.setMinContainerResource(
-        Resource.newInstance(
-            getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB,
-                YarnConfiguration.
-                    DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB_DEFAULT),
-            getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES,
-                YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES_DEFAULT)
-        )
-    );
-    dsResp.setMaxContainerResource(
-        Resource.newInstance(
-            getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB,
-                YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB_DEFAULT),
-            getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES,
-                YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES_DEFAULT)
-        )
-    );
-    dsResp.setIncrContainerResource(
-        Resource.newInstance(
-            getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB,
-                YarnConfiguration.
-                    DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB_DEFAULT),
-            getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES,
-                YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES_DEFAULT)
-        )
-    );
-    dsResp.setContainerTokenExpiryInterval(
-        getConfig().getInt(
-            YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS,
-            YarnConfiguration.
-                DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT));
-    dsResp.setContainerIdStart(
-        this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);
-
-    // Set nodes to be used for scheduling
-    dsResp.setNodesForScheduling(
-        this.nodeMonitor.selectLeastLoadedNodes(this.k));
-    return dsResp;
-  }
-
-  @Override
-  public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
-      DistributedSchedulingAllocateRequest request)
-      throws YarnException, IOException {
-    List<Container> distAllocContainers = request.getAllocatedContainers();
-    for (Container container : distAllocContainers) {
-      // Create RMContainer
-      SchedulerApplicationAttempt appAttempt =
-          ((AbstractYarnScheduler) rmContext.getScheduler())
-              .getCurrentAttemptForContainer(container.getId());
-      RMContainer rmContainer = new RMContainerImpl(container,
-          appAttempt.getApplicationAttemptId(), container.getNodeId(),
-          appAttempt.getUser(), rmContext, true);
-      appAttempt.addRMContainer(container.getId(), rmContainer);
-      rmContainer.handle(
-          new RMContainerEvent(container.getId(),
-              RMContainerEventType.LAUNCHED));
-    }
-    AllocateResponse response = allocate(request.getAllocateRequest());
-    DistributedSchedulingAllocateResponse dsResp = recordFactory
-        .newRecordInstance(DistributedSchedulingAllocateResponse.class);
-    dsResp.setAllocateResponse(response);
-    dsResp.setNodesForScheduling(
-        this.nodeMonitor.selectLeastLoadedNodes(this.k));
-    return dsResp;
-  }
-
-  private void addToMapping(ConcurrentHashMap<String, Set<NodeId>> mapping,
-                            String rackName, NodeId nodeId) {
-    if (rackName != null) {
-      mapping.putIfAbsent(rackName, new HashSet<NodeId>());
-      Set<NodeId> nodeIds = mapping.get(rackName);
-      synchronized (nodeIds) {
-        nodeIds.add(nodeId);
-      }
-    }
-  }
-
-  private void removeFromMapping(ConcurrentHashMap<String, Set<NodeId>> mapping,
-                                 String rackName, NodeId nodeId) {
-    if (rackName != null) {
-      Set<NodeId> nodeIds = mapping.get(rackName);
-      synchronized (nodeIds) {
-        nodeIds.remove(nodeId);
-      }
-    }
-  }
-
-  @Override
-  public void handle(SchedulerEvent event) {
-    switch (event.getType()) {
-    case NODE_ADDED:
-      if (!(event instanceof NodeAddedSchedulerEvent)) {
-        throw new RuntimeException("Unexpected event type: " + event);
-      }
-      NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event;
-      nodeMonitor.addNode(nodeAddedEvent.getContainerReports(),
-          nodeAddedEvent.getAddedRMNode());
-      addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(),
-          nodeAddedEvent.getAddedRMNode().getNodeID());
-      addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(),
-          nodeAddedEvent.getAddedRMNode().getNodeID());
-      break;
-    case NODE_REMOVED:
-      if (!(event instanceof NodeRemovedSchedulerEvent)) {
-        throw new RuntimeException("Unexpected event type: " + event);
-      }
-      NodeRemovedSchedulerEvent nodeRemovedEvent =
-          (NodeRemovedSchedulerEvent) event;
-      nodeMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode());
-      removeFromMapping(rackToNode,
-          nodeRemovedEvent.getRemovedRMNode().getRackName(),
-          nodeRemovedEvent.getRemovedRMNode().getNodeID());
-      removeFromMapping(hostToNode,
-          nodeRemovedEvent.getRemovedRMNode().getHostName(),
-          nodeRemovedEvent.getRemovedRMNode().getNodeID());
-      break;
-    case NODE_UPDATE:
-      if (!(event instanceof NodeUpdateSchedulerEvent)) {
-        throw new RuntimeException("Unexpected event type: " + event);
-      }
-      NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)
-          event;
-      nodeMonitor.updateNode(nodeUpdatedEvent.getRMNode());
-      break;
-    case NODE_RESOURCE_UPDATE:
-      if (!(event instanceof NodeResourceUpdateSchedulerEvent)) {
-        throw new RuntimeException("Unexpected event type: " + event);
-      }
-      NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent =
-          (NodeResourceUpdateSchedulerEvent) event;
-      nodeMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
-          nodeResourceUpdatedEvent.getResourceOption());
-      break;
-
-    // <-- IGNORED EVENTS : START -->
-    case APP_ADDED:
-      break;
-    case APP_REMOVED:
-      break;
-    case APP_ATTEMPT_ADDED:
-      break;
-    case APP_ATTEMPT_REMOVED:
-      break;
-    case CONTAINER_EXPIRED:
-      break;
-    case NODE_LABELS_UPDATE:
-      break;
-    // <-- IGNORED EVENTS : END -->
-    default:
-      LOG.error("Unknown event arrived at DistributedSchedulingAMService: "
-          + event.toString());
-    }
-
-  }
-
-  public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
-    return nodeMonitor.getThresholdCalculator();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9e061/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
new file mode 100644
index 0000000..a473b14
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
+import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
+
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
+
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor;
+
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The OpportunisticContainerAllocatorAMService is started instead of the
+ * ApplicationMasterService if distributed scheduling is enabled for the YARN
+ * cluster.
+ * It extends the functionality of the ApplicationMasterService by servicing
+ * clients (AMs and AMRMProxy request interceptors) that understand the
+ * DistributedSchedulingProtocol.
+ */
+public class OpportunisticContainerAllocatorAMService
+    extends ApplicationMasterService implements DistributedSchedulingAMProtocol,
+    EventHandler<SchedulerEvent> {
+
+  private static final Log LOG =
+      LogFactory.getLog(OpportunisticContainerAllocatorAMService.class);
+
+  private final NodeQueueLoadMonitor nodeMonitor;
+
+  private final ConcurrentHashMap<String, Set<NodeId>> rackToNode =
+      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, Set<NodeId>> hostToNode =
+      new ConcurrentHashMap<>();
+  private final int k;
+
+  public OpportunisticContainerAllocatorAMService(RMContext rmContext,
+      YarnScheduler scheduler) {
+    super(OpportunisticContainerAllocatorAMService.class.getName(),
+        rmContext, scheduler);
+    this.k = rmContext.getYarnConfiguration().getInt(
+        YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED,
+        YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED_DEFAULT);
+    long nodeSortInterval = rmContext.getYarnConfiguration().getLong(
+        YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS,
+        YarnConfiguration.
+            NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT);
+    NodeQueueLoadMonitor.LoadComparator comparator =
+        NodeQueueLoadMonitor.LoadComparator.valueOf(
+            rmContext.getYarnConfiguration().get(
+                YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR,
+                YarnConfiguration.
+                    NM_CONTAINER_QUEUING_LOAD_COMPARATOR_DEFAULT));
+
+    NodeQueueLoadMonitor topKSelector =
+        new NodeQueueLoadMonitor(nodeSortInterval, comparator);
+
+    float sigma = rmContext.getYarnConfiguration()
+        .getFloat(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV,
+            YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV_DEFAULT);
+
+    int limitMin, limitMax;
+
+    if (comparator == NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH) {
+      limitMin = rmContext.getYarnConfiguration()
+          .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH,
+              YarnConfiguration.
+                  NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH_DEFAULT);
+      limitMax = rmContext.getYarnConfiguration()
+          .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH,
+              YarnConfiguration.
+                  NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT);
+    } else {
+      limitMin = rmContext.getYarnConfiguration()
+          .getInt(
+              YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS,
+              YarnConfiguration.
+                  NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS_DEFAULT);
+      limitMax = rmContext.getYarnConfiguration()
+          .getInt(
+              YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS,
+              YarnConfiguration.
+                  NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS_DEFAULT);
+    }
+
+    topKSelector.initThresholdCalculator(sigma, limitMin, limitMax);
+    this.nodeMonitor = topKSelector;
+  }
+
+  @Override
+  public Server getServer(YarnRPC rpc, Configuration serverConf,
+      InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
+    if (YarnConfiguration.isDistSchedulingEnabled(serverConf)) {
+      Server server = rpc.getServer(DistributedSchedulingAMProtocol.class, this,
+          addr, serverConf, secretManager,
+          serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
+              YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
+      // To support application running on NMs that DO NOT support
+      // Dist Scheduling... The server multiplexes both the
+      // ApplicationMasterProtocol as well as the DistributedSchedulingProtocol
+      ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+          ApplicationMasterProtocolPB.class,
+          ApplicationMasterProtocolService.newReflectiveBlockingService(
+              new ApplicationMasterProtocolPBServiceImpl(this)));
+      return server;
+    }
+    return super.getServer(rpc, serverConf, addr, secretManager);
+  }
+
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster
+      (RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return super.registerApplicationMaster(request);
+  }
+
+  @Override
+  public FinishApplicationMasterResponse finishApplicationMaster
+      (FinishApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return super.finishApplicationMaster(request);
+  }
+
+  @Override
+  public AllocateResponse allocate(AllocateRequest request) throws
+      YarnException, IOException {
+    return super.allocate(request);
+  }
+
+  @Override
+  public RegisterDistributedSchedulingAMResponse
+  registerApplicationMasterForDistributedScheduling(
+      RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    RegisterApplicationMasterResponse response =
+        registerApplicationMaster(request);
+    RegisterDistributedSchedulingAMResponse dsResp = recordFactory
+        .newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
+    dsResp.setRegisterResponse(response);
+    dsResp.setMinContainerResource(
+        Resource.newInstance(
+            getConfig().getInt(
+                YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB,
+                YarnConfiguration.
+                    OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB_DEFAULT),
+            getConfig().getInt(
+                YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES,
+                YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES_DEFAULT)
+        )
+    );
+    dsResp.setMaxContainerResource(
+        Resource.newInstance(
+            getConfig().getInt(
+                YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB,
+                YarnConfiguration
+                    .OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB_DEFAULT),
+            getConfig().getInt(
+                YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES,
+                YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES_DEFAULT)
+        )
+    );
+    dsResp.setIncrContainerResource(
+        Resource.newInstance(
+            getConfig().getInt(
+                YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB,
+                YarnConfiguration.
+                    OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB_DEFAULT),
+            getConfig().getInt(
+                YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES,
+                YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES_DEFAULT)
+        )
+    );
+    dsResp.setContainerTokenExpiryInterval(
+        getConfig().getInt(
+            YarnConfiguration.OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS,
+            YarnConfiguration.
+                OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS_DEFAULT));
+    dsResp.setContainerIdStart(
+        this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);
+
+    // Set nodes to be used for scheduling
+    dsResp.setNodesForScheduling(
+        this.nodeMonitor.selectLeastLoadedNodes(this.k));
+    return dsResp;
+  }
+
+  @Override
+  public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
+      DistributedSchedulingAllocateRequest request)
+      throws YarnException, IOException {
+    List<Container> distAllocContainers = request.getAllocatedContainers();
+    for (Container container : distAllocContainers) {
+      // Create RMContainer
+      SchedulerApplicationAttempt appAttempt =
+          ((AbstractYarnScheduler) rmContext.getScheduler())
+              .getCurrentAttemptForContainer(container.getId());
+      RMContainer rmContainer = new RMContainerImpl(container,
+          appAttempt.getApplicationAttemptId(), container.getNodeId(),
+          appAttempt.getUser(), rmContext, true);
+      appAttempt.addRMContainer(container.getId(), rmContainer);
+      rmContainer.handle(
+          new RMContainerEvent(container.getId(),
+              RMContainerEventType.LAUNCHED));
+    }
+    AllocateResponse response = allocate(request.getAllocateRequest());
+    DistributedSchedulingAllocateResponse dsResp = recordFactory
+        .newRecordInstance(DistributedSchedulingAllocateResponse.class);
+    dsResp.setAllocateResponse(response);
+    dsResp.setNodesForScheduling(
+        this.nodeMonitor.selectLeastLoadedNodes(this.k));
+    return dsResp;
+  }
+
+  private void addToMapping(ConcurrentHashMap<String, Set<NodeId>> mapping,
+                            String rackName, NodeId nodeId) {
+    if (rackName != null) {
+      mapping.putIfAbsent(rackName, new HashSet<NodeId>());
+      Set<NodeId> nodeIds = mapping.get(rackName);
+      synchronized (nodeIds) {
+        nodeIds.add(nodeId);
+      }
+    }
+  }
+
+  private void removeFromMapping(ConcurrentHashMap<String, Set<NodeId>> mapping,
+                                 String rackName, NodeId nodeId) {
+    if (rackName != null) {
+      Set<NodeId> nodeIds = mapping.get(rackName);
+      synchronized (nodeIds) {
+        nodeIds.remove(nodeId);
+      }
+    }
+  }
+
+  @Override
+  public void handle(SchedulerEvent event) {
+    switch (event.getType()) {
+    case NODE_ADDED:
+      if (!(event instanceof NodeAddedSchedulerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event;
+      nodeMonitor.addNode(nodeAddedEvent.getContainerReports(),
+          nodeAddedEvent.getAddedRMNode());
+      addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(),
+          nodeAddedEvent.getAddedRMNode().getNodeID());
+      addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(),
+          nodeAddedEvent.getAddedRMNode().getNodeID());
+      break;
+    case NODE_REMOVED:
+      if (!(event instanceof NodeRemovedSchedulerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      NodeRemovedSchedulerEvent nodeRemovedEvent =
+          (NodeRemovedSchedulerEvent) event;
+      nodeMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode());
+      removeFromMapping(rackToNode,
+          nodeRemovedEvent.getRemovedRMNode().getRackName(),
+          nodeRemovedEvent.getRemovedRMNode().getNodeID());
+      removeFromMapping(hostToNode,
+          nodeRemovedEvent.getRemovedRMNode().getHostName(),
+          nodeRemovedEvent.getRemovedRMNode().getNodeID());
+      break;
+    case NODE_UPDATE:
+      if (!(event instanceof NodeUpdateSchedulerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)
+          event;
+      nodeMonitor.updateNode(nodeUpdatedEvent.getRMNode());
+      break;
+    case NODE_RESOURCE_UPDATE:
+      if (!(event instanceof NodeResourceUpdateSchedulerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent =
+          (NodeResourceUpdateSchedulerEvent) event;
+      nodeMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
+          nodeResourceUpdatedEvent.getResourceOption());
+      break;
+
+    // <-- IGNORED EVENTS : START -->
+    case APP_ADDED:
+      break;
+    case APP_REMOVED:
+      break;
+    case APP_ATTEMPT_ADDED:
+      break;
+    case APP_ATTEMPT_REMOVED:
+      break;
+    case CONTAINER_EXPIRED:
+      break;
+    case NODE_LABELS_UPDATE:
+      break;
+    // <-- IGNORED EVENTS : END -->
+    default:
+      LOG.error("Unknown event arrived at" +
+          "OpportunisticContainerAllocatorAMService: " + event.toString());
+    }
+
+  }
+
+  public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
+    return nodeMonitor.getThresholdCalculator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9e061/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 4509045..bf72fc1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -116,7 +116,6 @@ import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
 import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
 import org.apache.hadoop.yarn.server.webproxy.WebAppProxy;
 import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.webapp.WebApp;
 import org.apache.hadoop.yarn.webapp.WebApps;
 import org.apache.hadoop.yarn.webapp.WebApps.Builder;
@@ -1177,24 +1176,27 @@ public class ResourceManager extends CompositeService implements Recoverable {
   }
 
   protected ApplicationMasterService createApplicationMasterService() {
-    if (this.rmContext.getYarnConfiguration().getBoolean(
-        YarnConfiguration.DIST_SCHEDULING_ENABLED,
-        YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
-      DistributedSchedulingAMService distributedSchedulingService = new
-          DistributedSchedulingAMService(this.rmContext, scheduler);
-      EventDispatcher distSchedulerEventDispatcher =
-          new EventDispatcher(distributedSchedulingService,
-              DistributedSchedulingAMService.class.getName());
-      // Add an event dispatcher for the DistributedSchedulingAMService
-      // to handle node updates/additions and removals.
+    Configuration config = this.rmContext.getYarnConfiguration();
+    if (YarnConfiguration.isOpportunisticContainerAllocationEnabled(config)
+        || YarnConfiguration.isDistSchedulingEnabled(config)) {
+      OpportunisticContainerAllocatorAMService
+          oppContainerAllocatingAMService =
+          new OpportunisticContainerAllocatorAMService(this.rmContext,
+              scheduler);
+      EventDispatcher oppContainerAllocEventDispatcher =
+          new EventDispatcher(oppContainerAllocatingAMService,
+              OpportunisticContainerAllocatorAMService.class.getName());
+      // Add an event dispatcher for the
+      // OpportunisticContainerAllocatorAMService to handle node
+      // updates/additions and removals.
       // Since the SchedulerEvent is currently a super set of theses,
       // we register interest for it..
-      addService(distSchedulerEventDispatcher);
+      addService(oppContainerAllocEventDispatcher);
       rmDispatcher.register(SchedulerEventType.class,
-          distSchedulerEventDispatcher);
+          oppContainerAllocEventDispatcher);
       this.rmContext.setContainerQueueLimitCalculator(
-          distributedSchedulingService.getNodeManagerQueueLimitCalculator());
-      return distributedSchedulingService;
+          oppContainerAllocatingAMService.getNodeManagerQueueLimitCalculator());
+      return oppContainerAllocatingAMService;
     }
     return new ApplicationMasterService(this.rmContext, scheduler);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9e061/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 7d1b3c3..5856e59 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -820,9 +820,10 @@ public class MockRM extends ResourceManager {
   @Override
   protected ApplicationMasterService createApplicationMasterService() {
     if (this.rmContext.getYarnConfiguration().getBoolean(
-        YarnConfiguration.DIST_SCHEDULING_ENABLED,
-        YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
-      return new DistributedSchedulingAMService(getRMContext(), scheduler) {
+        YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED,
+        YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED_DEFAULT)) {
+      return new OpportunisticContainerAllocatorAMService(getRMContext(),
+          scheduler) {
         @Override
         protected void serviceStart() {
           // override to not start rpc handler

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9e061/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java
deleted file mode 100644
index 0213a94..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterResponsePBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateRequestPBImpl;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Test cases for {@link DistributedSchedulingAMService}.
- */
-public class TestDistributedSchedulingAMService {
-
-  // Test if the DistributedSchedulingAMService can handle both DSProtocol as
-  // well as AMProtocol clients
-  @Test
-  public void testRPCWrapping() throws Exception {
-    Configuration conf = new Configuration();
-    conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
-        .getName());
-    YarnRPC rpc = YarnRPC.create(conf);
-    String bindAddr = "localhost:0";
-    InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
-    conf.setSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, addr);
-    final RecordFactory factory = RecordFactoryProvider.getRecordFactory(null);
-    final RMContext rmContext = new RMContextImpl() {
-      @Override
-      public AMLivelinessMonitor getAMLivelinessMonitor() {
-        return null;
-      }
-
-      @Override
-      public Configuration getYarnConfiguration() {
-        return new YarnConfiguration();
-      }
-    };
-    Container c = factory.newRecordInstance(Container.class);
-    c.setExecutionType(ExecutionType.OPPORTUNISTIC);
-    c.setId(
-        ContainerId.newContainerId(
-            ApplicationAttemptId.newInstance(
-                ApplicationId.newInstance(12345, 1), 2), 3));
-    AllocateRequest allReq =
-        (AllocateRequestPBImpl)factory.newRecordInstance(AllocateRequest.class);
-    allReq.setAskList(Arrays.asList(
-        ResourceRequest.newInstance(Priority.UNDEFINED, "a",
-            Resource.newInstance(1, 2), 1, true, "exp",
-            ExecutionTypeRequest.newInstance(
-                ExecutionType.OPPORTUNISTIC, true))));
-    DistributedSchedulingAMService service =
-        createService(factory, rmContext, c);
-    Server server = service.getServer(rpc, conf, addr, null);
-    server.start();
-
-    // Verify that the DistrubutedSchedulingService can handle vanilla
-    // ApplicationMasterProtocol clients
-    RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class,
-        ProtobufRpcEngine.class);
-    ApplicationMasterProtocolPB ampProxy =
-        RPC.getProxy(ApplicationMasterProtocolPB
-            .class, 1, NetUtils.getConnectAddress(server), conf);
-    RegisterApplicationMasterResponse regResp =
-        new RegisterApplicationMasterResponsePBImpl(
-            ampProxy.registerApplicationMaster(null,
-                ((RegisterApplicationMasterRequestPBImpl)factory
-                    .newRecordInstance(
-                        RegisterApplicationMasterRequest.class)).getProto()));
-    Assert.assertEquals("dummyQueue", regResp.getQueue());
-    FinishApplicationMasterResponse finishResp =
-        new FinishApplicationMasterResponsePBImpl(
-            ampProxy.finishApplicationMaster(null,
-                ((FinishApplicationMasterRequestPBImpl)factory
-                    .newRecordInstance(
-                        FinishApplicationMasterRequest.class)).getProto()
-            ));
-    Assert.assertEquals(false, finishResp.getIsUnregistered());
-    AllocateResponse allocResp =
-        new AllocateResponsePBImpl(
-            ampProxy.allocate(null,
-                ((AllocateRequestPBImpl)factory
-                    .newRecordInstance(AllocateRequest.class)).getProto())
-        );
-    List<Container> allocatedContainers = allocResp.getAllocatedContainers();
-    Assert.assertEquals(1, allocatedContainers.size());
-    Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
-        allocatedContainers.get(0).getExecutionType());
-    Assert.assertEquals(12345, allocResp.getNumClusterNodes());
-
-
-    // Verify that the DistrubutedSchedulingService can handle the
-    // DistributedSchedulingAMProtocol clients as well
-    RPC.setProtocolEngine(conf, DistributedSchedulingAMProtocolPB.class,
-        ProtobufRpcEngine.class);
-    DistributedSchedulingAMProtocolPB dsProxy =
-        RPC.getProxy(DistributedSchedulingAMProtocolPB
-            .class, 1, NetUtils.getConnectAddress(server), conf);
-
-    RegisterDistributedSchedulingAMResponse dsRegResp =
-        new RegisterDistributedSchedulingAMResponsePBImpl(
-            dsProxy.registerApplicationMasterForDistributedScheduling(null,
-                ((RegisterApplicationMasterRequestPBImpl)factory
-                    .newRecordInstance(RegisterApplicationMasterRequest.class))
-                    .getProto()));
-    Assert.assertEquals(54321l, dsRegResp.getContainerIdStart());
-    Assert.assertEquals(4,
-        dsRegResp.getMaxContainerResource().getVirtualCores());
-    Assert.assertEquals(1024,
-        dsRegResp.getMinContainerResource().getMemorySize());
-    Assert.assertEquals(2,
-        dsRegResp.getIncrContainerResource().getVirtualCores());
-
-    DistributedSchedulingAllocateRequestPBImpl distAllReq =
-        (DistributedSchedulingAllocateRequestPBImpl)factory.newRecordInstance(
-            DistributedSchedulingAllocateRequest.class);
-    distAllReq.setAllocateRequest(allReq);
-    distAllReq.setAllocatedContainers(Arrays.asList(c));
-    DistributedSchedulingAllocateResponse dsAllocResp =
-        new DistributedSchedulingAllocateResponsePBImpl(
-            dsProxy.allocateForDistributedScheduling(null,
-                distAllReq.getProto()));
-    Assert.assertEquals(
-        "h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
-
-    FinishApplicationMasterResponse dsfinishResp =
-        new FinishApplicationMasterResponsePBImpl(
-            dsProxy.finishApplicationMaster(null,
-                ((FinishApplicationMasterRequestPBImpl) factory
-                    .newRecordInstance(FinishApplicationMasterRequest.class))
-                    .getProto()));
-    Assert.assertEquals(
-        false, dsfinishResp.getIsUnregistered());
-  }
-
-  private DistributedSchedulingAMService createService(final RecordFactory
-      factory, final RMContext rmContext, final Container c) {
-    return new DistributedSchedulingAMService(rmContext, null) {
-      @Override
-      public RegisterApplicationMasterResponse registerApplicationMaster(
-          RegisterApplicationMasterRequest request) throws
-          YarnException, IOException {
-        RegisterApplicationMasterResponse resp = factory.newRecordInstance(
-            RegisterApplicationMasterResponse.class);
-        // Dummy Entry to Assert that we get this object back
-        resp.setQueue("dummyQueue");
-        return resp;
-      }
-
-      @Override
-      public FinishApplicationMasterResponse finishApplicationMaster(
-          FinishApplicationMasterRequest request) throws YarnException,
-          IOException {
-        FinishApplicationMasterResponse resp = factory.newRecordInstance(
-            FinishApplicationMasterResponse.class);
-        // Dummy Entry to Assert that we get this object back
-        resp.setIsUnregistered(false);
-        return resp;
-      }
-
-      @Override
-      public AllocateResponse allocate(AllocateRequest request) throws
-          YarnException, IOException {
-        AllocateResponse response = factory.newRecordInstance(
-            AllocateResponse.class);
-        response.setNumClusterNodes(12345);
-        response.setAllocatedContainers(Arrays.asList(c));
-        return response;
-      }
-
-      @Override
-      public RegisterDistributedSchedulingAMResponse
-          registerApplicationMasterForDistributedScheduling(
-          RegisterApplicationMasterRequest request)
-          throws YarnException, IOException {
-        RegisterDistributedSchedulingAMResponse resp = factory
-            .newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
-        resp.setContainerIdStart(54321L);
-        resp.setMaxContainerResource(Resource.newInstance(4096, 4));
-        resp.setMinContainerResource(Resource.newInstance(1024, 1));
-        resp.setIncrContainerResource(Resource.newInstance(2048, 2));
-        return resp;
-      }
-
-      @Override
-      public DistributedSchedulingAllocateResponse
-          allocateForDistributedScheduling(
-          DistributedSchedulingAllocateRequest request)
-          throws YarnException, IOException {
-        List<ResourceRequest> askList =
-            request.getAllocateRequest().getAskList();
-        List<Container> allocatedContainers = request.getAllocatedContainers();
-        Assert.assertEquals(1, allocatedContainers.size());
-        Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
-            allocatedContainers.get(0).getExecutionType());
-        Assert.assertEquals(1, askList.size());
-        Assert.assertTrue(askList.get(0)
-            .getExecutionTypeRequest().getEnforceExecutionType());
-        DistributedSchedulingAllocateResponse resp = factory
-            .newRecordInstance(DistributedSchedulingAllocateResponse.class);
-        resp.setNodesForScheduling(
-            Arrays.asList(NodeId.newInstance("h1", 1234)));
-        return resp;
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9e061/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
new file mode 100644
index 0000000..07c6b54
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Test cases for {@link OpportunisticContainerAllocatorAMService}.
+ */
+public class TestOpportunisticContainerAllocatorAMService {
+
+  // Test if the OpportunisticContainerAllocatorAMService can handle both
+  // DSProtocol as well as AMProtocol clients
+  @Test
+  public void testRPCWrapping() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
+        .getName());
+    YarnRPC rpc = YarnRPC.create(conf);
+    String bindAddr = "localhost:0";
+    InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
+    conf.setSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, addr);
+    final RecordFactory factory = RecordFactoryProvider.getRecordFactory(null);
+    final RMContext rmContext = new RMContextImpl() {
+      @Override
+      public AMLivelinessMonitor getAMLivelinessMonitor() {
+        return null;
+      }
+
+      @Override
+      public Configuration getYarnConfiguration() {
+        return new YarnConfiguration();
+      }
+    };
+    Container c = factory.newRecordInstance(Container.class);
+    c.setExecutionType(ExecutionType.OPPORTUNISTIC);
+    c.setId(
+        ContainerId.newContainerId(
+            ApplicationAttemptId.newInstance(
+                ApplicationId.newInstance(12345, 1), 2), 3));
+    AllocateRequest allReq =
+        (AllocateRequestPBImpl)factory.newRecordInstance(AllocateRequest.class);
+    allReq.setAskList(Arrays.asList(
+        ResourceRequest.newInstance(Priority.UNDEFINED, "a",
+            Resource.newInstance(1, 2), 1, true, "exp",
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.OPPORTUNISTIC, true))));
+    OpportunisticContainerAllocatorAMService service =
+        createService(factory, rmContext, c);
+    conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
+    Server server = service.getServer(rpc, conf, addr, null);
+    server.start();
+
+    // Verify that the DistrubutedSchedulingService can handle vanilla
+    // ApplicationMasterProtocol clients
+    RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class,
+        ProtobufRpcEngine.class);
+    ApplicationMasterProtocolPB ampProxy =
+        RPC.getProxy(ApplicationMasterProtocolPB
+            .class, 1, NetUtils.getConnectAddress(server), conf);
+    RegisterApplicationMasterResponse regResp =
+        new RegisterApplicationMasterResponsePBImpl(
+            ampProxy.registerApplicationMaster(null,
+                ((RegisterApplicationMasterRequestPBImpl)factory
+                    .newRecordInstance(
+                        RegisterApplicationMasterRequest.class)).getProto()));
+    Assert.assertEquals("dummyQueue", regResp.getQueue());
+    FinishApplicationMasterResponse finishResp =
+        new FinishApplicationMasterResponsePBImpl(
+            ampProxy.finishApplicationMaster(null,
+                ((FinishApplicationMasterRequestPBImpl)factory
+                    .newRecordInstance(
+                        FinishApplicationMasterRequest.class)).getProto()
+            ));
+    Assert.assertEquals(false, finishResp.getIsUnregistered());
+    AllocateResponse allocResp =
+        new AllocateResponsePBImpl(
+            ampProxy.allocate(null,
+                ((AllocateRequestPBImpl)factory
+                    .newRecordInstance(AllocateRequest.class)).getProto())
+        );
+    List<Container> allocatedContainers = allocResp.getAllocatedContainers();
+    Assert.assertEquals(1, allocatedContainers.size());
+    Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
+        allocatedContainers.get(0).getExecutionType());
+    Assert.assertEquals(12345, allocResp.getNumClusterNodes());
+
+
+    // Verify that the DistrubutedSchedulingService can handle the
+    // DistributedSchedulingAMProtocol clients as well
+    RPC.setProtocolEngine(conf, DistributedSchedulingAMProtocolPB.class,
+        ProtobufRpcEngine.class);
+    DistributedSchedulingAMProtocolPB dsProxy =
+        RPC.getProxy(DistributedSchedulingAMProtocolPB
+            .class, 1, NetUtils.getConnectAddress(server), conf);
+
+    RegisterDistributedSchedulingAMResponse dsRegResp =
+        new RegisterDistributedSchedulingAMResponsePBImpl(
+            dsProxy.registerApplicationMasterForDistributedScheduling(null,
+                ((RegisterApplicationMasterRequestPBImpl)factory
+                    .newRecordInstance(RegisterApplicationMasterRequest.class))
+                    .getProto()));
+    Assert.assertEquals(54321l, dsRegResp.getContainerIdStart());
+    Assert.assertEquals(4,
+        dsRegResp.getMaxContainerResource().getVirtualCores());
+    Assert.assertEquals(1024,
+        dsRegResp.getMinContainerResource().getMemorySize());
+    Assert.assertEquals(2,
+        dsRegResp.getIncrContainerResource().getVirtualCores());
+
+    DistributedSchedulingAllocateRequestPBImpl distAllReq =
+        (DistributedSchedulingAllocateRequestPBImpl)factory.newRecordInstance(
+            DistributedSchedulingAllocateRequest.class);
+    distAllReq.setAllocateRequest(allReq);
+    distAllReq.setAllocatedContainers(Arrays.asList(c));
+    DistributedSchedulingAllocateResponse dsAllocResp =
+        new DistributedSchedulingAllocateResponsePBImpl(
+            dsProxy.allocateForDistributedScheduling(null,
+                distAllReq.getProto()));
+    Assert.assertEquals(
+        "h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
+
+    FinishApplicationMasterResponse dsfinishResp =
+        new FinishApplicationMasterResponsePBImpl(
+            dsProxy.finishApplicationMaster(null,
+                ((FinishApplicationMasterRequestPBImpl) factory
+                    .newRecordInstance(FinishApplicationMasterRequest.class))
+                    .getProto()));
+    Assert.assertEquals(
+        false, dsfinishResp.getIsUnregistered());
+  }
+
+  private OpportunisticContainerAllocatorAMService createService(
+      final RecordFactory factory, final RMContext rmContext,
+      final Container c) {
+    return new OpportunisticContainerAllocatorAMService(rmContext, null) {
+      @Override
+      public RegisterApplicationMasterResponse registerApplicationMaster(
+          RegisterApplicationMasterRequest request) throws
+          YarnException, IOException {
+        RegisterApplicationMasterResponse resp = factory.newRecordInstance(
+            RegisterApplicationMasterResponse.class);
+        // Dummy Entry to Assert that we get this object back
+        resp.setQueue("dummyQueue");
+        return resp;
+      }
+
+      @Override
+      public FinishApplicationMasterResponse finishApplicationMaster(
+          FinishApplicationMasterRequest request) throws YarnException,
+          IOException {
+        FinishApplicationMasterResponse resp = factory.newRecordInstance(
+            FinishApplicationMasterResponse.class);
+        // Dummy Entry to Assert that we get this object back
+        resp.setIsUnregistered(false);
+        return resp;
+      }
+
+      @Override
+      public AllocateResponse allocate(AllocateRequest request) throws
+          YarnException, IOException {
+        AllocateResponse response = factory.newRecordInstance(
+            AllocateResponse.class);
+        response.setNumClusterNodes(12345);
+        response.setAllocatedContainers(Arrays.asList(c));
+        return response;
+      }
+
+      @Override
+      public RegisterDistributedSchedulingAMResponse
+          registerApplicationMasterForDistributedScheduling(
+          RegisterApplicationMasterRequest request)
+          throws YarnException, IOException {
+        RegisterDistributedSchedulingAMResponse resp = factory
+            .newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
+        resp.setContainerIdStart(54321L);
+        resp.setMaxContainerResource(Resource.newInstance(4096, 4));
+        resp.setMinContainerResource(Resource.newInstance(1024, 1));
+        resp.setIncrContainerResource(Resource.newInstance(2048, 2));
+        return resp;
+      }
+
+      @Override
+      public DistributedSchedulingAllocateResponse
+          allocateForDistributedScheduling(
+          DistributedSchedulingAllocateRequest request)
+          throws YarnException, IOException {
+        List<ResourceRequest> askList =
+            request.getAllocateRequest().getAskList();
+        List<Container> allocatedContainers = request.getAllocatedContainers();
+        Assert.assertEquals(1, allocatedContainers.size());
+        Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
+            allocatedContainers.get(0).getExecutionType());
+        Assert.assertEquals(1, askList.size());
+        Assert.assertTrue(askList.get(0)
+            .getExecutionTypeRequest().getEnforceExecutionType());
+        DistributedSchedulingAllocateResponse resp = factory
+            .newRecordInstance(DistributedSchedulingAllocateResponse.class);
+        resp.setNodesForScheduling(
+            Arrays.asList(NodeId.newInstance("h1", 1234)));
+        return resp;
+      }
+    };
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message