hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dran...@apache.org
Subject [43/50] [abbrv] hadoop git commit: YARN-5113. Refactoring and other clean-up for distributed scheduling. (Konstantinos Karanasos via asuresh)
Date Mon, 01 Aug 2016 15:55:34 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5766b1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
deleted file mode 100644
index 5aabddc..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
-import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
-
-
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
-
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor;
-
-
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * The DistributedSchedulingService is started instead of the
- * ApplicationMasterService if DistributedScheduling is enabled for the YARN
- * cluster.
- * It extends the functionality of the ApplicationMasterService by servicing
- * clients (AMs and AMRMProxy request interceptors) that understand the
- * DistributedSchedulingProtocol.
- */
-public class DistributedSchedulingService extends ApplicationMasterService
-    implements DistributedSchedulerProtocol, EventHandler<SchedulerEvent> {
-
-  private static final Log LOG =
-      LogFactory.getLog(DistributedSchedulingService.class);
-
-  private final NodeQueueLoadMonitor nodeMonitor;
-
-  private final ConcurrentHashMap<String, Set<NodeId>> rackToNode =
-      new ConcurrentHashMap<>();
-  private final ConcurrentHashMap<String, Set<NodeId>> hostToNode =
-      new ConcurrentHashMap<>();
-  private final int k;
-
-  public DistributedSchedulingService(RMContext rmContext,
-      YarnScheduler scheduler) {
-    super(DistributedSchedulingService.class.getName(), rmContext, scheduler);
-    this.k = rmContext.getYarnConfiguration().getInt(
-        YarnConfiguration.DIST_SCHEDULING_TOP_K,
-        YarnConfiguration.DIST_SCHEDULING_TOP_K_DEFAULT);
-    long nodeSortInterval = rmContext.getYarnConfiguration().getLong(
-        YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS,
-        YarnConfiguration.
-            NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT);
-    NodeQueueLoadMonitor.LoadComparator comparator =
-        NodeQueueLoadMonitor.LoadComparator.valueOf(
-            rmContext.getYarnConfiguration().get(
-                YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR,
-                YarnConfiguration.
-                    NM_CONTAINER_QUEUING_LOAD_COMPARATOR_DEFAULT));
-
-    NodeQueueLoadMonitor topKSelector =
-        new NodeQueueLoadMonitor(nodeSortInterval, comparator);
-
-    float sigma = rmContext.getYarnConfiguration()
-        .getFloat(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV,
-            YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV_DEFAULT);
-
-    int limitMin, limitMax;
-
-    if (comparator == NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH) {
-      limitMin = rmContext.getYarnConfiguration()
-          .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH,
-              YarnConfiguration.
-                  NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH_DEFAULT);
-      limitMax = rmContext.getYarnConfiguration()
-          .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH,
-              YarnConfiguration.
-                  NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT);
-    } else {
-      limitMin = rmContext.getYarnConfiguration()
-          .getInt(
-              YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS,
-              YarnConfiguration.
-                  NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS_DEFAULT);
-      limitMax = rmContext.getYarnConfiguration()
-          .getInt(
-              YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS,
-              YarnConfiguration.
-                  NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS_DEFAULT);
-    }
-
-    topKSelector.initThresholdCalculator(sigma, limitMin, limitMax);
-    this.nodeMonitor = topKSelector;
-  }
-
-  @Override
-  public Server getServer(YarnRPC rpc, Configuration serverConf,
-      InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
-    Server server = rpc.getServer(DistributedSchedulerProtocol.class, this,
-        addr, serverConf, secretManager,
-        serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
-    // To support application running on NMs that DO NOT support
-    // Dist Scheduling... The server multiplexes both the
-    // ApplicationMasterProtocol as well as the DistributedSchedulingProtocol
-    ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
-        ApplicationMasterProtocolPB.class,
-        ApplicationMasterProtocolService.newReflectiveBlockingService(
-            new ApplicationMasterProtocolPBServiceImpl(this)));
-    return server;
-  }
-
-  @Override
-  public RegisterApplicationMasterResponse registerApplicationMaster
-      (RegisterApplicationMasterRequest request) throws YarnException,
-      IOException {
-    return super.registerApplicationMaster(request);
-  }
-
-  @Override
-  public FinishApplicationMasterResponse finishApplicationMaster
-      (FinishApplicationMasterRequest request) throws YarnException,
-      IOException {
-    return super.finishApplicationMaster(request);
-  }
-
-  @Override
-  public AllocateResponse allocate(AllocateRequest request) throws
-      YarnException, IOException {
-    return super.allocate(request);
-  }
-
-  @Override
-  public DistSchedRegisterResponse
-  registerApplicationMasterForDistributedScheduling(
-      RegisterApplicationMasterRequest request) throws YarnException,
-      IOException {
-    RegisterApplicationMasterResponse response =
-        registerApplicationMaster(request);
-    DistSchedRegisterResponse dsResp = recordFactory
-        .newRecordInstance(DistSchedRegisterResponse.class);
-    dsResp.setRegisterResponse(response);
-    dsResp.setMinAllocatableCapabilty(
-        Resource.newInstance(
-            getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY,
-                YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY_DEFAULT),
-            getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_MIN_VCORES,
-                YarnConfiguration.DIST_SCHEDULING_MIN_VCORES_DEFAULT)
-        )
-    );
-    dsResp.setMaxAllocatableCapabilty(
-        Resource.newInstance(
-            getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY,
-                YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_DEFAULT),
-            getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_MAX_VCORES,
-                YarnConfiguration.DIST_SCHEDULING_MAX_VCORES_DEFAULT)
-        )
-    );
-    dsResp.setIncrAllocatableCapabilty(
-        Resource.newInstance(
-            getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY,
-                YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY_DEFAULT),
-            getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_INCR_VCORES,
-                YarnConfiguration.DIST_SCHEDULING_INCR_VCORES_DEFAULT)
-        )
-    );
-    dsResp.setContainerTokenExpiryInterval(
-        getConfig().getInt(
-            YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS,
-            YarnConfiguration.
-                DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT));
-    dsResp.setContainerIdStart(
-        this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);
-
-    // Set nodes to be used for scheduling
-    dsResp.setNodesForScheduling(
-        this.nodeMonitor.selectLeastLoadedNodes(this.k));
-    return dsResp;
-  }
-
-  @Override
-  public DistSchedAllocateResponse allocateForDistributedScheduling(
-      DistSchedAllocateRequest request) throws YarnException, IOException {
-    List<Container> distAllocContainers = request.getAllocatedContainers();
-    for (Container container : distAllocContainers) {
-      // Create RMContainer
-      SchedulerApplicationAttempt appAttempt =
-          ((AbstractYarnScheduler) rmContext.getScheduler())
-              .getCurrentAttemptForContainer(container.getId());
-      RMContainer rmContainer = new RMContainerImpl(container,
-          appAttempt.getApplicationAttemptId(), container.getNodeId(),
-          appAttempt.getUser(), rmContext, true);
-      appAttempt.addRMContainer(container.getId(), rmContainer);
-      rmContainer.handle(
-          new RMContainerEvent(container.getId(),
-              RMContainerEventType.LAUNCHED));
-    }
-    AllocateResponse response = allocate(request.getAllocateRequest());
-    DistSchedAllocateResponse dsResp = recordFactory.newRecordInstance
-        (DistSchedAllocateResponse.class);
-    dsResp.setAllocateResponse(response);
-    dsResp.setNodesForScheduling(
-        this.nodeMonitor.selectLeastLoadedNodes(this.k));
-    return dsResp;
-  }
-
-  private void addToMapping(ConcurrentHashMap<String, Set<NodeId>> mapping,
-      String rackName, NodeId nodeId) {
-    if (rackName != null) {
-      mapping.putIfAbsent(rackName, new HashSet<NodeId>());
-      Set<NodeId> nodeIds = mapping.get(rackName);
-      synchronized (nodeIds) {
-        nodeIds.add(nodeId);
-      }
-    }
-  }
-
-  private void removeFromMapping(ConcurrentHashMap<String, Set<NodeId>> mapping,
-      String rackName, NodeId nodeId) {
-    if (rackName != null) {
-      Set<NodeId> nodeIds = mapping.get(rackName);
-      synchronized (nodeIds) {
-        nodeIds.remove(nodeId);
-      }
-    }
-  }
-
-  @Override
-  public void handle(SchedulerEvent event) {
-    switch (event.getType()) {
-    case NODE_ADDED:
-      if (!(event instanceof NodeAddedSchedulerEvent)) {
-        throw new RuntimeException("Unexpected event type: " + event);
-      }
-      NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event;
-      nodeMonitor.addNode(nodeAddedEvent.getContainerReports(),
-          nodeAddedEvent.getAddedRMNode());
-      addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(),
-          nodeAddedEvent.getAddedRMNode().getNodeID());
-      addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(),
-          nodeAddedEvent.getAddedRMNode().getNodeID());
-      break;
-    case NODE_REMOVED:
-      if (!(event instanceof NodeRemovedSchedulerEvent)) {
-        throw new RuntimeException("Unexpected event type: " + event);
-      }
-      NodeRemovedSchedulerEvent nodeRemovedEvent =
-          (NodeRemovedSchedulerEvent) event;
-      nodeMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode());
-      removeFromMapping(rackToNode,
-          nodeRemovedEvent.getRemovedRMNode().getRackName(),
-          nodeRemovedEvent.getRemovedRMNode().getNodeID());
-      removeFromMapping(hostToNode,
-          nodeRemovedEvent.getRemovedRMNode().getHostName(),
-          nodeRemovedEvent.getRemovedRMNode().getNodeID());
-      break;
-    case NODE_UPDATE:
-      if (!(event instanceof NodeUpdateSchedulerEvent)) {
-        throw new RuntimeException("Unexpected event type: " + event);
-      }
-      NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)
-          event;
-      nodeMonitor.updateNode(nodeUpdatedEvent.getRMNode());
-      break;
-    case NODE_RESOURCE_UPDATE:
-      if (!(event instanceof NodeResourceUpdateSchedulerEvent)) {
-        throw new RuntimeException("Unexpected event type: " + event);
-      }
-      NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent =
-          (NodeResourceUpdateSchedulerEvent) event;
-      nodeMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
-          nodeResourceUpdatedEvent.getResourceOption());
-      break;
-
-    // <-- IGNORED EVENTS : START -->
-    case APP_ADDED:
-      break;
-    case APP_REMOVED:
-      break;
-    case APP_ATTEMPT_ADDED:
-      break;
-    case APP_ATTEMPT_REMOVED:
-      break;
-    case CONTAINER_EXPIRED:
-      break;
-    case NODE_LABELS_UPDATE:
-      break;
-    // <-- IGNORED EVENTS : END -->
-    default:
-      LOG.error("Unknown event arrived at DistributedSchedulingService: "
-          + event.toString());
-    }
-
-  }
-
-  public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
-    return nodeMonitor.getThresholdCalculator();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5766b1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 0c1df33..4509045 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -1180,12 +1180,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
     if (this.rmContext.getYarnConfiguration().getBoolean(
         YarnConfiguration.DIST_SCHEDULING_ENABLED,
         YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
-      DistributedSchedulingService distributedSchedulingService = new
-          DistributedSchedulingService(this.rmContext, scheduler);
+      DistributedSchedulingAMService distributedSchedulingService = new
+          DistributedSchedulingAMService(this.rmContext, scheduler);
       EventDispatcher distSchedulerEventDispatcher =
           new EventDispatcher(distributedSchedulingService,
-              DistributedSchedulingService.class.getName());
-      // Add an event dispoatcher for the DistributedSchedulingService
+              DistributedSchedulingAMService.class.getName());
+      // Add an event dispatcher for the DistributedSchedulingAMService
       // to handle node updates/additions and removals.
       // Since the SchedulerEvent is currently a super set of theses,
       // we register interest for it..

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5766b1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 3764664..c677345 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
@@ -93,8 +94,8 @@ public class AppSchedulingInfo {
     this.queue = queue;
     this.user = user;
     this.activeUsersManager = activeUsersManager;
-    this.containerIdCounter =
-        new AtomicLong(epoch << EPOCH_BIT_SHIFT);
+    this.containerIdCounter = new AtomicLong(
+        epoch << ResourceManager.EPOCH_BIT_SHIFT);
     this.appResourceUsage = appResourceUsage;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5766b1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index dcdc934..7d1b3c3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -822,7 +822,7 @@ public class MockRM extends ResourceManager {
     if (this.rmContext.getYarnConfiguration().getBoolean(
         YarnConfiguration.DIST_SCHEDULING_ENABLED,
         YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
-      return new DistributedSchedulingService(getRMContext(), scheduler) {
+      return new DistributedSchedulingAMService(getRMContext(), scheduler) {
         @Override
         protected void serviceStart() {
           // override to not start rpc handler

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5766b1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java
new file mode 100644
index 0000000..0213a94
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Test cases for {@link DistributedSchedulingAMService}.
+ */
+public class TestDistributedSchedulingAMService {
+
+  // Test if the DistributedSchedulingAMService can handle both DSProtocol as
+  // well as AMProtocol clients
+  @Test
+  public void testRPCWrapping() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
+        .getName());
+    YarnRPC rpc = YarnRPC.create(conf);
+    String bindAddr = "localhost:0";
+    InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
+    conf.setSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, addr);
+    final RecordFactory factory = RecordFactoryProvider.getRecordFactory(null);
+    final RMContext rmContext = new RMContextImpl() {
+      @Override
+      public AMLivelinessMonitor getAMLivelinessMonitor() {
+        return null;
+      }
+
+      @Override
+      public Configuration getYarnConfiguration() {
+        return new YarnConfiguration();
+      }
+    };
+    Container c = factory.newRecordInstance(Container.class);
+    c.setExecutionType(ExecutionType.OPPORTUNISTIC);
+    c.setId(
+        ContainerId.newContainerId(
+            ApplicationAttemptId.newInstance(
+                ApplicationId.newInstance(12345, 1), 2), 3));
+    AllocateRequest allReq =
+        (AllocateRequestPBImpl)factory.newRecordInstance(AllocateRequest.class);
+    allReq.setAskList(Arrays.asList(
+        ResourceRequest.newInstance(Priority.UNDEFINED, "a",
+            Resource.newInstance(1, 2), 1, true, "exp",
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.OPPORTUNISTIC, true))));
+    DistributedSchedulingAMService service =
+        createService(factory, rmContext, c);
+    Server server = service.getServer(rpc, conf, addr, null);
+    server.start();
+
+    // Verify that the DistrubutedSchedulingService can handle vanilla
+    // ApplicationMasterProtocol clients
+    RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class,
+        ProtobufRpcEngine.class);
+    ApplicationMasterProtocolPB ampProxy =
+        RPC.getProxy(ApplicationMasterProtocolPB
+            .class, 1, NetUtils.getConnectAddress(server), conf);
+    RegisterApplicationMasterResponse regResp =
+        new RegisterApplicationMasterResponsePBImpl(
+            ampProxy.registerApplicationMaster(null,
+                ((RegisterApplicationMasterRequestPBImpl)factory
+                    .newRecordInstance(
+                        RegisterApplicationMasterRequest.class)).getProto()));
+    Assert.assertEquals("dummyQueue", regResp.getQueue());
+    FinishApplicationMasterResponse finishResp =
+        new FinishApplicationMasterResponsePBImpl(
+            ampProxy.finishApplicationMaster(null,
+                ((FinishApplicationMasterRequestPBImpl)factory
+                    .newRecordInstance(
+                        FinishApplicationMasterRequest.class)).getProto()
+            ));
+    Assert.assertEquals(false, finishResp.getIsUnregistered());
+    AllocateResponse allocResp =
+        new AllocateResponsePBImpl(
+            ampProxy.allocate(null,
+                ((AllocateRequestPBImpl)factory
+                    .newRecordInstance(AllocateRequest.class)).getProto())
+        );
+    List<Container> allocatedContainers = allocResp.getAllocatedContainers();
+    Assert.assertEquals(1, allocatedContainers.size());
+    Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
+        allocatedContainers.get(0).getExecutionType());
+    Assert.assertEquals(12345, allocResp.getNumClusterNodes());
+
+
+    // Verify that the DistrubutedSchedulingService can handle the
+    // DistributedSchedulingAMProtocol clients as well
+    RPC.setProtocolEngine(conf, DistributedSchedulingAMProtocolPB.class,
+        ProtobufRpcEngine.class);
+    DistributedSchedulingAMProtocolPB dsProxy =
+        RPC.getProxy(DistributedSchedulingAMProtocolPB
+            .class, 1, NetUtils.getConnectAddress(server), conf);
+
+    RegisterDistributedSchedulingAMResponse dsRegResp =
+        new RegisterDistributedSchedulingAMResponsePBImpl(
+            dsProxy.registerApplicationMasterForDistributedScheduling(null,
+                ((RegisterApplicationMasterRequestPBImpl)factory
+                    .newRecordInstance(RegisterApplicationMasterRequest.class))
+                    .getProto()));
+    Assert.assertEquals(54321l, dsRegResp.getContainerIdStart());
+    Assert.assertEquals(4,
+        dsRegResp.getMaxContainerResource().getVirtualCores());
+    Assert.assertEquals(1024,
+        dsRegResp.getMinContainerResource().getMemorySize());
+    Assert.assertEquals(2,
+        dsRegResp.getIncrContainerResource().getVirtualCores());
+
+    DistributedSchedulingAllocateRequestPBImpl distAllReq =
+        (DistributedSchedulingAllocateRequestPBImpl)factory.newRecordInstance(
+            DistributedSchedulingAllocateRequest.class);
+    distAllReq.setAllocateRequest(allReq);
+    distAllReq.setAllocatedContainers(Arrays.asList(c));
+    DistributedSchedulingAllocateResponse dsAllocResp =
+        new DistributedSchedulingAllocateResponsePBImpl(
+            dsProxy.allocateForDistributedScheduling(null,
+                distAllReq.getProto()));
+    Assert.assertEquals(
+        "h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
+
+    FinishApplicationMasterResponse dsfinishResp =
+        new FinishApplicationMasterResponsePBImpl(
+            dsProxy.finishApplicationMaster(null,
+                ((FinishApplicationMasterRequestPBImpl) factory
+                    .newRecordInstance(FinishApplicationMasterRequest.class))
+                    .getProto()));
+    Assert.assertEquals(
+        false, dsfinishResp.getIsUnregistered());
+  }
+
+  private DistributedSchedulingAMService createService(final RecordFactory
+      factory, final RMContext rmContext, final Container c) {
+    return new DistributedSchedulingAMService(rmContext, null) {
+      @Override
+      public RegisterApplicationMasterResponse registerApplicationMaster(
+          RegisterApplicationMasterRequest request) throws
+          YarnException, IOException {
+        RegisterApplicationMasterResponse resp = factory.newRecordInstance(
+            RegisterApplicationMasterResponse.class);
+        // Dummy Entry to Assert that we get this object back
+        resp.setQueue("dummyQueue");
+        return resp;
+      }
+
+      @Override
+      public FinishApplicationMasterResponse finishApplicationMaster(
+          FinishApplicationMasterRequest request) throws YarnException,
+          IOException {
+        FinishApplicationMasterResponse resp = factory.newRecordInstance(
+            FinishApplicationMasterResponse.class);
+        // Dummy Entry to Assert that we get this object back
+        resp.setIsUnregistered(false);
+        return resp;
+      }
+
+      @Override
+      public AllocateResponse allocate(AllocateRequest request) throws
+          YarnException, IOException {
+        AllocateResponse response = factory.newRecordInstance(
+            AllocateResponse.class);
+        response.setNumClusterNodes(12345);
+        response.setAllocatedContainers(Arrays.asList(c));
+        return response;
+      }
+
+      @Override
+      public RegisterDistributedSchedulingAMResponse
+          registerApplicationMasterForDistributedScheduling(
+          RegisterApplicationMasterRequest request)
+          throws YarnException, IOException {
+        RegisterDistributedSchedulingAMResponse resp = factory
+            .newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
+        resp.setContainerIdStart(54321L);
+        resp.setMaxContainerResource(Resource.newInstance(4096, 4));
+        resp.setMinContainerResource(Resource.newInstance(1024, 1));
+        resp.setIncrContainerResource(Resource.newInstance(2048, 2));
+        return resp;
+      }
+
+      @Override
+      public DistributedSchedulingAllocateResponse
+          allocateForDistributedScheduling(
+          DistributedSchedulingAllocateRequest request)
+          throws YarnException, IOException {
+        List<ResourceRequest> askList =
+            request.getAllocateRequest().getAskList();
+        List<Container> allocatedContainers = request.getAllocatedContainers();
+        Assert.assertEquals(1, allocatedContainers.size());
+        Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
+            allocatedContainers.get(0).getExecutionType());
+        Assert.assertEquals(1, askList.size());
+        Assert.assertTrue(askList.get(0)
+            .getExecutionTypeRequest().getEnforceExecutionType());
+        DistributedSchedulingAllocateResponse resp = factory
+            .newRecordInstance(DistributedSchedulingAllocateResponse.class);
+        resp.setNodesForScheduling(
+            Arrays.asList(NodeId.newInstance("h1", 1234)));
+        return resp;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5766b1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
deleted file mode 100644
index 4716bab..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
-    .AllocateResponsePBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
-    .FinishApplicationMasterRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
-    .FinishApplicationMasterResponsePBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
-    .RegisterApplicationMasterRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
-    .RegisterApplicationMasterResponsePBImpl;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateRequestPBImpl;
-
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
-    .DistSchedAllocateResponsePBImpl;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
-    .DistSchedRegisterResponsePBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
-    .AMLivelinessMonitor;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-import java.util.List;
-
-public class TestDistributedSchedulingService {
-
-  // Test if the DistributedSchedulingService can handle both DSProtocol as
-  // well as AMProtocol clients
-  @Test
-  public void testRPCWrapping() throws Exception {
-    Configuration conf = new Configuration();
-    conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
-        .getName());
-    YarnRPC rpc = YarnRPC.create(conf);
-    String bindAddr = "localhost:0";
-    InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
-    conf.setSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, addr);
-    final RecordFactory factory = RecordFactoryProvider.getRecordFactory(null);
-    final RMContext rmContext = new RMContextImpl() {
-      @Override
-      public AMLivelinessMonitor getAMLivelinessMonitor() {
-        return null;
-      }
-
-      @Override
-      public Configuration getYarnConfiguration() {
-        return new YarnConfiguration();
-      }
-    };
-    Container c = factory.newRecordInstance(Container.class);
-    c.setExecutionType(ExecutionType.OPPORTUNISTIC);
-    c.setId(
-        ContainerId.newContainerId(
-            ApplicationAttemptId.newInstance(
-                ApplicationId.newInstance(12345, 1), 2), 3));
-    AllocateRequest allReq =
-        (AllocateRequestPBImpl)factory.newRecordInstance(AllocateRequest.class);
-    allReq.setAskList(Arrays.asList(
-        ResourceRequest.newInstance(Priority.UNDEFINED, "a",
-            Resource.newInstance(1, 2), 1, true, "exp",
-            ExecutionTypeRequest.newInstance(
-                ExecutionType.OPPORTUNISTIC, true))));
-    DistributedSchedulingService service = createService(factory, rmContext, c);
-    Server server = service.getServer(rpc, conf, addr, null);
-    server.start();
-
-    // Verify that the DistrubutedSchedulingService can handle vanilla
-    // ApplicationMasterProtocol clients
-    RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class,
-        ProtobufRpcEngine.class);
-    ApplicationMasterProtocolPB ampProxy =
-        RPC.getProxy(ApplicationMasterProtocolPB
-        .class, 1, NetUtils.getConnectAddress(server), conf);
-    RegisterApplicationMasterResponse regResp =
-        new RegisterApplicationMasterResponsePBImpl(
-            ampProxy.registerApplicationMaster(null,
-                ((RegisterApplicationMasterRequestPBImpl)factory
-                    .newRecordInstance(
-                        RegisterApplicationMasterRequest.class)).getProto()));
-    Assert.assertEquals("dummyQueue", regResp.getQueue());
-    FinishApplicationMasterResponse finishResp =
-        new FinishApplicationMasterResponsePBImpl(
-            ampProxy.finishApplicationMaster(null,
-                ((FinishApplicationMasterRequestPBImpl)factory
-                    .newRecordInstance(
-                        FinishApplicationMasterRequest.class)).getProto()
-            ));
-    Assert.assertEquals(false, finishResp.getIsUnregistered());
-    AllocateResponse allocResp =
-        new AllocateResponsePBImpl(
-            ampProxy.allocate(null,
-                ((AllocateRequestPBImpl)factory
-                    .newRecordInstance(AllocateRequest.class)).getProto())
-        );
-    List<Container> allocatedContainers = allocResp.getAllocatedContainers();
-    Assert.assertEquals(1, allocatedContainers.size());
-    Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
-        allocatedContainers.get(0).getExecutionType());
-    Assert.assertEquals(12345, allocResp.getNumClusterNodes());
-
-
-    // Verify that the DistrubutedSchedulingService can handle the
-    // DistributedSchedulerProtocol clients as well
-    RPC.setProtocolEngine(conf, DistributedSchedulerProtocolPB.class,
-        ProtobufRpcEngine.class);
-    DistributedSchedulerProtocolPB dsProxy =
-        RPC.getProxy(DistributedSchedulerProtocolPB
-            .class, 1, NetUtils.getConnectAddress(server), conf);
-
-    DistSchedRegisterResponse dsRegResp =
-        new DistSchedRegisterResponsePBImpl(
-            dsProxy.registerApplicationMasterForDistributedScheduling(null,
-                ((RegisterApplicationMasterRequestPBImpl)factory
-                    .newRecordInstance(RegisterApplicationMasterRequest.class))
-                    .getProto()));
-    Assert.assertEquals(54321l, dsRegResp.getContainerIdStart());
-    Assert.assertEquals(4,
-        dsRegResp.getMaxAllocatableCapabilty().getVirtualCores());
-    Assert.assertEquals(1024,
-        dsRegResp.getMinAllocatableCapabilty().getMemorySize());
-    Assert.assertEquals(2,
-        dsRegResp.getIncrAllocatableCapabilty().getVirtualCores());
-
-    DistSchedAllocateRequestPBImpl distAllReq =
-        (DistSchedAllocateRequestPBImpl)factory.newRecordInstance(
-            DistSchedAllocateRequest.class);
-    distAllReq.setAllocateRequest(allReq);
-    distAllReq.setAllocatedContainers(Arrays.asList(c));
-    DistSchedAllocateResponse dsAllocResp =
-        new DistSchedAllocateResponsePBImpl(
-            dsProxy.allocateForDistributedScheduling(null,
-                distAllReq.getProto()));
-    Assert.assertEquals(
-        "h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
-
-    FinishApplicationMasterResponse dsfinishResp =
-        new FinishApplicationMasterResponsePBImpl(
-            dsProxy.finishApplicationMaster(null,
-                ((FinishApplicationMasterRequestPBImpl) factory
-                    .newRecordInstance(FinishApplicationMasterRequest.class))
-                    .getProto()));
-    Assert.assertEquals(
-        false, dsfinishResp.getIsUnregistered());
-  }
-
-  private DistributedSchedulingService createService(final RecordFactory
-      factory, final RMContext rmContext, final Container c) {
-    return new DistributedSchedulingService(rmContext, null) {
-      @Override
-      public RegisterApplicationMasterResponse registerApplicationMaster(
-          RegisterApplicationMasterRequest request) throws
-          YarnException, IOException {
-        RegisterApplicationMasterResponse resp = factory.newRecordInstance(
-            RegisterApplicationMasterResponse.class);
-        // Dummy Entry to Assert that we get this object back
-        resp.setQueue("dummyQueue");
-        return resp;
-      }
-
-      @Override
-      public FinishApplicationMasterResponse finishApplicationMaster(
-          FinishApplicationMasterRequest request) throws YarnException,
-          IOException {
-        FinishApplicationMasterResponse resp = factory.newRecordInstance(
-            FinishApplicationMasterResponse.class);
-        // Dummy Entry to Assert that we get this object back
-        resp.setIsUnregistered(false);
-        return resp;
-      }
-
-      @Override
-      public AllocateResponse allocate(AllocateRequest request) throws
-          YarnException, IOException {
-        AllocateResponse response = factory.newRecordInstance(
-            AllocateResponse.class);
-        response.setNumClusterNodes(12345);
-        response.setAllocatedContainers(Arrays.asList(c));
-        return response;
-      }
-
-      @Override
-      public DistSchedRegisterResponse
-          registerApplicationMasterForDistributedScheduling(
-          RegisterApplicationMasterRequest request) throws
-          YarnException, IOException {
-        DistSchedRegisterResponse resp = factory.newRecordInstance(
-            DistSchedRegisterResponse.class);
-        resp.setContainerIdStart(54321L);
-        resp.setMaxAllocatableCapabilty(Resource.newInstance(4096, 4));
-        resp.setMinAllocatableCapabilty(Resource.newInstance(1024, 1));
-        resp.setIncrAllocatableCapabilty(Resource.newInstance(2048, 2));
-        return resp;
-      }
-
-      @Override
-      public DistSchedAllocateResponse allocateForDistributedScheduling(
-          DistSchedAllocateRequest request) throws YarnException, IOException {
-        List<ResourceRequest> askList =
-            request.getAllocateRequest().getAskList();
-        List<Container> allocatedContainers = request.getAllocatedContainers();
-        Assert.assertEquals(1, allocatedContainers.size());
-        Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
-            allocatedContainers.get(0).getExecutionType());
-        Assert.assertEquals(1, askList.size());
-        Assert.assertTrue(askList.get(0)
-            .getExecutionTypeRequest().getEnforceExecutionType());
-        DistSchedAllocateResponse resp =
-            factory.newRecordInstance(DistSchedAllocateResponse.class);
-        resp.setNodesForScheduling(
-            Arrays.asList(NodeId.newInstance("h1", 1234)));
-        return resp;
-      }
-    };
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message