Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 624E0200B6F for ; Tue, 9 Aug 2016 09:49:34 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6100F160AA4; Tue, 9 Aug 2016 07:49:34 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3F365160AA5 for ; Tue, 9 Aug 2016 09:49:32 +0200 (CEST) Received: (qmail 43292 invoked by uid 500); 9 Aug 2016 07:49:31 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 43136 invoked by uid 99); 9 Aug 2016 07:49:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Aug 2016 07:49:31 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0C6DDE78A1; Tue, 9 Aug 2016 07:49:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: asuresh@apache.org To: common-commits@hadoop.apache.org Date: Tue, 09 Aug 2016 07:49:31 -0000 Message-Id: In-Reply-To: <5ded045015714a1b9335602050876d66@git.apache.org> References: <5ded045015714a1b9335602050876d66@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] hadoop git commit: YARN-5457. Refactor DistributedScheduling framework to pull out common functionality. (asuresh) archived-at: Tue, 09 Aug 2016 07:49:34 -0000 YARN-5457. Refactor DistributedScheduling framework to pull out common functionality. (asuresh) (cherry picked from commit 82c9e061017c32e633e0b0cbb7978749a6df4fb2) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5f7edb79 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5f7edb79 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5f7edb79 Branch: refs/heads/branch-2 Commit: 5f7edb79d155df57c0b5cc8d8d2b0f908dee8ee9 Parents: 427b540 Author: Arun Suresh Authored: Fri Aug 5 11:13:05 2016 -0700 Committer: Arun Suresh Committed: Tue Aug 9 00:46:08 2016 -0700 ---------------------------------------------------------------------- .../hadoop/mapred/TestMROpportunisticMaps.java | 2 + .../hadoop/yarn/conf/YarnConfiguration.java | 107 +++--- .../api/impl/TestDistributedScheduling.java | 4 +- .../src/main/resources/yarn-default.xml | 50 +-- .../OpportunisticContainerAllocator.java | 378 +++++++++++++++++++ .../OpportunisticContainerContext.java | 178 +++++++++ .../yarn/server/scheduler/package-info.java | 22 ++ .../hadoop/yarn/server/nodemanager/Context.java | 2 +- .../yarn/server/nodemanager/NodeManager.java | 9 +- .../amrmproxy/DefaultRequestInterceptor.java | 67 +++- .../scheduler/DistributedScheduler.java | 264 +++---------- .../OpportunisticContainerAllocator.java | 190 ---------- .../amrmproxy/BaseAMRMProxyTest.java | 2 +- .../scheduler/TestDistributedScheduler.java | 10 +- .../DistributedSchedulingAMService.java | 361 ------------------ ...pportunisticContainerAllocatorAMService.java | 367 ++++++++++++++++++ .../server/resourcemanager/ResourceManager.java | 32 +- .../yarn/server/resourcemanager/MockRM.java | 7 +- .../TestDistributedSchedulingAMService.java | 269 ------------- ...pportunisticContainerAllocatorAMService.java | 271 +++++++++++++ 20 files changed, 1445 insertions(+), 1147 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5f7edb79/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java index dfe85f2..021863b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java @@ -91,6 +91,8 @@ public class TestMROpportunisticMaps { Configuration conf = new Configuration(); // Start the mini-MR and mini-DFS clusters conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); + conf.setBoolean(YarnConfiguration. + OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true); conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true); dfsCluster = new MiniDFSCluster.Builder(conf) http://git-wip-us.apache.org/repos/asf/hadoop/blob/5f7edb79/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 8aa0419..c341022 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -301,55 +301,60 @@ public class YarnConfiguration extends Configuration { YARN_PREFIX + "distributed-scheduling.enabled"; public static final boolean DIST_SCHEDULING_ENABLED_DEFAULT = false; - /** Minimum memory (in MB) used for allocating a container through distributed - * scheduling. */ - public static final String DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB = - YARN_PREFIX + "distributed-scheduling.min-container-memory-mb"; - public static final int DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB_DEFAULT = 512; - - /** Minimum virtual CPU cores used for allocating a container through - * distributed scheduling. */ - public static final String DIST_SCHEDULING_MIN_CONTAINER_VCORES = - YARN_PREFIX + "distributed-scheduling.min-container-vcores"; - public static final int DIST_SCHEDULING_MIN_CONTAINER_VCORES_DEFAULT = 1; - - /** Maximum memory (in MB) used for allocating a container through distributed - * scheduling. */ - public static final String DIST_SCHEDULING_MAX_MEMORY_MB = - YARN_PREFIX + "distributed-scheduling.max-container-memory-mb"; - public static final int DIST_SCHEDULING_MAX_MEMORY_MB_DEFAULT = 2048; - - /** Maximum virtual CPU cores used for allocating a container through - * distributed scheduling. */ - public static final String DIST_SCHEDULING_MAX_CONTAINER_VCORES = - YARN_PREFIX + "distributed-scheduling.max-container-vcores"; - public static final int DIST_SCHEDULING_MAX_CONTAINER_VCORES_DEFAULT = 4; - - /** Incremental memory (in MB) used for allocating a container through - * distributed scheduling. */ - public static final String DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB = - YARN_PREFIX + "distributed-scheduling.incr-container-memory-mb"; - public static final int DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB_DEFAULT = + /** Setting that controls whether opportunistic container allocation + * is enabled or not. */ + public static final String OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED = + YARN_PREFIX + "opportunistic-container-allocation.enabled"; + public static final boolean + OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED_DEFAULT = false; + + /** Minimum memory (in MB) used for allocating an opportunistic container. */ + public static final String OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB = + YARN_PREFIX + "opportunistic-containers.min-memory-mb"; + public static final int OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB_DEFAULT = 512; + + /** Minimum virtual CPU cores used for allocating an opportunistic container. + * */ + public static final String OPPORTUNISTIC_CONTAINERS_MIN_VCORES = + YARN_PREFIX + "opportunistic-containers.min-vcores"; + public static final int OPPORTUNISTIC_CONTAINERS_MIN_VCORES_DEFAULT = 1; + + /** Maximum memory (in MB) used for allocating an opportunistic container. */ + public static final String OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB = + YARN_PREFIX + "opportunistic-containers.max-memory-mb"; + public static final int OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB_DEFAULT = 2048; + + /** Maximum virtual CPU cores used for allocating an opportunistic container. + * */ + public static final String OPPORTUNISTIC_CONTAINERS_MAX_VCORES = + YARN_PREFIX + "opportunistic-containers.max-vcores"; + public static final int OPPORTUNISTIC_CONTAINERS_MAX_VCORES_DEFAULT = 4; + + /** Incremental memory (in MB) used for allocating an opportunistic container. + * */ + public static final String OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB = + YARN_PREFIX + "opportunistic-containers.incr-memory-mb"; + public static final int OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB_DEFAULT = 512; - /** Incremental virtual CPU cores used for allocating a container through - * distributed scheduling. */ - public static final String DIST_SCHEDULING_INCR_CONTAINER_VCORES = - YARN_PREFIX + "distributed-scheduling.incr-vcores"; - public static final int DIST_SCHEDULING_INCR_CONTAINER_VCORES_DEFAULT = 1; - - /** Container token expiry for container allocated via distributed - * scheduling. */ - public static final String DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS = - YARN_PREFIX + "distributed-scheduling.container-token-expiry-ms"; - public static final int DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT = + /** Incremental virtual CPU cores used for allocating an opportunistic + * container. */ + public static final String OPPORTUNISTIC_CONTAINERS_INCR_VCORES = + YARN_PREFIX + "opportunistic-containers.incr-vcores"; + public static final int OPPORTUNISTIC_CONTAINERS_INCR_VCORES_DEFAULT = 1; + + /** Container token expiry for opportunistic containers. */ + public static final String OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS = + YARN_PREFIX + "opportunistic-containers.container-token-expiry-ms"; + public static final int OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS_DEFAULT = 600000; - /** Number of nodes to be used by the LocalScheduler of a NodeManager for - * dispatching containers during distributed scheduling. */ - public static final String DIST_SCHEDULING_NODES_NUMBER_USED = - YARN_PREFIX + "distributed-scheduling.nodes-used"; - public static final int DIST_SCHEDULING_NODES_NUMBER_USED_DEFAULT = 10; + /** Number of nodes to be used by the Opportunistic Container allocator for + * dispatching containers during container allocation. */ + public static final String OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED = + YARN_PREFIX + "opportunistic-container-allocation.nodes-used"; + public static final int OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED_DEFAULT = + 10; /** Frequency for computing least loaded NMs. */ public static final String NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS = @@ -2733,6 +2738,18 @@ public class YarnConfiguration extends Configuration { return clusterId; } + public static boolean isDistSchedulingEnabled(Configuration conf) { + return conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, + YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT); + } + + public static boolean isOpportunisticContainerAllocationEnabled( + Configuration conf) { + return conf.getBoolean( + YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, + YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED_DEFAULT); + } + /* For debugging. mp configurations to system output as XML format. */ public static void main(String[] args) throws Exception { new YarnConfiguration(new Configuration()).writeXml(System.out); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5f7edb79/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java index 7245bc6..11da7ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java @@ -84,7 +84,7 @@ import static org.mockito.Mockito.when; * specifying OPPORTUNISTIC containers in its resource requests, * the AMRMProxyService on the NM, the DistributedScheduler RequestInterceptor * on the NM and the DistributedSchedulingProtocol used by the framework to talk - * to the DistributedSchedulingAMService running on the RM. + * to the OpportunisticContainerAllocatorAMService running on the RM. */ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest { @@ -105,6 +105,8 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest { conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); + conf.setBoolean(YarnConfiguration. + OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true); conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true); cluster.init(conf); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5f7edb79/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 9b16d05..2a8bd2d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2678,72 +2678,76 @@ - Minimum memory (in MB) used for allocating a container through distributed - scheduling. + Setting that controls whether opportunistic container allocation + is enabled. - yarn.distributed-scheduling.min-container-memory-mb + yarn.opportunistic-container-allocation.enabled + false + + + + + Minimum memory (in MB) used for allocating an opportunistic container. + + yarn.opportunistic-containers.min-memory-mb 512 - Minimum virtual CPU cores used for allocating a container through - distributed scheduling. + Minimum virtual CPU cores used for allocating an opportunistic container. - yarn.distributed-scheduling.min-container-vcores + yarn.opportunistic-containers.min-vcores 1 - Maximum memory (in MB) used for allocating a container through distributed - scheduling. + Maximum memory (in MB) used for allocating an opportunistic container. - yarn.distributed-scheduling.max-container-memory-mb + yarn.opportunistic-containers.max-memory-mb 2048 - Maximum virtual CPU cores used for allocating a container through - distributed scheduling. + Maximum virtual CPU cores used for allocating an opportunistic container. - yarn.distributed-scheduling.max-container-vcores + yarn.opportunistic-containers.max-vcores 4 - Incremental memory (in MB) used for allocating a container through - distributed scheduling. + Incremental memory (in MB) used for allocating an opportunistic container. - yarn.distributed-scheduling.incr-container-memory-mb + yarn.opportunistic-containers.incr-memory-mb 512 - Incremental virtual CPU cores used for allocating a container through - distributed scheduling. + Incremental virtual CPU cores used for allocating an opportunistic + container. - yarn.distributed-scheduling.incr-vcores + yarn.opportunistic-containers.incr-vcores 1 - Container token expiry for container allocated via distributed scheduling. + Container token expiry for opportunistic containers. - yarn.distributed-scheduling.container-token-expiry-ms + yarn.opportunistic-containers.container-token-expiry-ms 600000 - Number of nodes to be used by the LocalScheduler of a NodeManager for - dispatching containers during distributed scheduling. + Number of nodes to be used by the Opportunistic Container Allocator for + dispatching containers during container allocation. - yarn.distributed-scheduling.nodes-used + yarn.opportunistic-container-allocation.nodes-used 10 http://git-wip-us.apache.org/repos/asf/hadoop/blob/5f7edb79/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java new file mode 100644 index 0000000..41b5d56 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -0,0 +1,378 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.scheduler; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.ContainerType; + +import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +/** + *

+ * The OpportunisticContainerAllocator allocates containers on a given list of + * nodes, after modifying the container sizes to respect the limits set by the + * ResourceManager. It tries to distribute the containers as evenly as possible. + *

+ */ +public class OpportunisticContainerAllocator { + + /** + * This class encapsulates application specific parameters used to build a + * Container. + */ + public static class AllocationParams { + private Resource maxResource; + private Resource minResource; + private Resource incrementResource; + private int containerTokenExpiryInterval; + + /** + * Return Max Resource. + * @return Resource + */ + public Resource getMaxResource() { + return maxResource; + } + + /** + * Set Max Resource. + * @param maxResource Resource + */ + public void setMaxResource(Resource maxResource) { + this.maxResource = maxResource; + } + + /** + * Get Min Resource. + * @return Resource + */ + public Resource getMinResource() { + return minResource; + } + + /** + * Set Min Resource. + * @param minResource Resource + */ + public void setMinResource(Resource minResource) { + this.minResource = minResource; + } + + /** + * Get Incremental Resource. + * @return Incremental Resource + */ + public Resource getIncrementResource() { + return incrementResource; + } + + /** + * Set Incremental resource. + * @param incrementResource Resource + */ + public void setIncrementResource(Resource incrementResource) { + this.incrementResource = incrementResource; + } + + /** + * Get Container Token Expiry interval. + * @return Container Token Expiry interval + */ + public int getContainerTokenExpiryInterval() { + return containerTokenExpiryInterval; + } + + /** + * Set Container Token Expiry time in ms. + * @param containerTokenExpiryInterval Container Token Expiry in ms + */ + public void setContainerTokenExpiryInterval( + int containerTokenExpiryInterval) { + this.containerTokenExpiryInterval = containerTokenExpiryInterval; + } + } + + /** + * A Container Id Generator. + */ + public static class ContainerIdGenerator { + + protected volatile AtomicLong containerIdCounter = new AtomicLong(1); + + /** + * This method can reset the generator to a specific value. + * @param containerIdStart containerId + */ + public void resetContainerIdCounter(long containerIdStart) { + this.containerIdCounter.set(containerIdStart); + } + + /** + * Sets the underlying Atomic Long. To be used when implementation needs to + * share the underlying AtomicLong of an existing counter. + * @param counter AtomicLong + */ + public void setContainerIdCounter(AtomicLong counter) { + this.containerIdCounter = counter; + } + + /** + * Generates a new long value. Default implementation increments the + * underlying AtomicLong. Sub classes are encouraged to over-ride this + * behaviour. + * @return Counter. + */ + public long generateContainerId() { + return this.containerIdCounter.incrementAndGet(); + } + } + + static class PartitionedResourceRequests { + private List guaranteed = new ArrayList<>(); + private List opportunistic = new ArrayList<>(); + public List getGuaranteed() { + return guaranteed; + } + public List getOpportunistic() { + return opportunistic; + } + } + + private static final Log LOG = + LogFactory.getLog(OpportunisticContainerAllocator.class); + + private static final ResourceCalculator RESOURCE_CALCULATOR = + new DominantResourceCalculator(); + + private final BaseContainerTokenSecretManager tokenSecretManager; + private int webpagePort; + + /** + * Create a new Opportunistic Container Allocator. + * @param tokenSecretManager TokenSecretManager + * @param webpagePort Webpage Port + */ + public OpportunisticContainerAllocator( + BaseContainerTokenSecretManager tokenSecretManager, int webpagePort) { + this.tokenSecretManager = tokenSecretManager; + this.webpagePort = webpagePort; + } + + /** + * Entry point into the Opportunistic Container Allocator. + * @param request AllocateRequest + * @param applicationAttemptId ApplicationAttemptId + * @param appContext App Specific OpportunisticContainerContext + * @param rmIdentifier RM Identifier + * @param appSubmitter App Submitter + * @return List of Containers. + * @throws YarnException YarnException + */ + public List allocateContainers( + AllocateRequest request, ApplicationAttemptId applicationAttemptId, + OpportunisticContainerContext appContext, long rmIdentifier, + String appSubmitter) throws YarnException { + // Partition requests into GUARANTEED and OPPORTUNISTIC reqs + PartitionedResourceRequests partitionedAsks = + partitionAskList(request.getAskList()); + + List releasedContainers = request.getReleaseList(); + int numReleasedContainers = releasedContainers.size(); + if (numReleasedContainers > 0) { + LOG.info("AttemptID: " + applicationAttemptId + " released: " + + numReleasedContainers); + appContext.getContainersAllocated().removeAll(releasedContainers); + } + + // Also, update black list + ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest(); + if (rbr != null) { + appContext.getBlacklist().removeAll(rbr.getBlacklistRemovals()); + appContext.getBlacklist().addAll(rbr.getBlacklistAdditions()); + } + + // Add OPPORTUNISTIC reqs to the outstanding reqs + appContext.addToOutstandingReqs(partitionedAsks.getOpportunistic()); + + List allocatedContainers = new ArrayList<>(); + for (Priority priority : + appContext.getOutstandingOpReqs().descendingKeySet()) { + // Allocated containers : + // Key = Requested Capability, + // Value = List of Containers of given Cap (The actual container size + // might be different than what is requested.. which is why + // we need the requested capability (key) to match against + // the outstanding reqs) + Map> allocated = allocate(rmIdentifier, + appContext, priority, applicationAttemptId, appSubmitter); + for (Map.Entry> e : allocated.entrySet()) { + appContext.matchAllocationToOutstandingRequest( + e.getKey(), e.getValue()); + allocatedContainers.addAll(e.getValue()); + } + } + + // Send all the GUARANTEED Reqs to RM + request.setAskList(partitionedAsks.getGuaranteed()); + return allocatedContainers; + } + + private Map> allocate(long rmIdentifier, + OpportunisticContainerContext appContext, Priority priority, + ApplicationAttemptId appAttId, String userName) throws YarnException { + Map> containers = new HashMap<>(); + for (ResourceRequest anyAsk : + appContext.getOutstandingOpReqs().get(priority).values()) { + allocateContainersInternal(rmIdentifier, appContext.getAppParams(), + appContext.getContainerIdGenerator(), appContext.getBlacklist(), + appAttId, appContext.getNodeMap(), userName, containers, anyAsk); + LOG.info("Opportunistic allocation requested for [" + + "priority=" + anyAsk.getPriority() + + ", num_containers=" + anyAsk.getNumContainers() + + ", capability=" + anyAsk.getCapability() + "]" + + " allocated = " + containers.get(anyAsk.getCapability()).size()); + } + return containers; + } + + private void allocateContainersInternal(long rmIdentifier, + AllocationParams appParams, ContainerIdGenerator idCounter, + Set blacklist, ApplicationAttemptId id, + Map allNodes, String userName, + Map> containers, ResourceRequest anyAsk) + throws YarnException { + int toAllocate = anyAsk.getNumContainers() + - (containers.isEmpty() ? 0 : + containers.get(anyAsk.getCapability()).size()); + + List nodesForScheduling = new ArrayList<>(); + for (Entry nodeEntry : allNodes.entrySet()) { + // Do not use blacklisted nodes for scheduling. + if (blacklist.contains(nodeEntry.getKey())) { + continue; + } + nodesForScheduling.add(nodeEntry.getValue()); + } + int numAllocated = 0; + int nextNodeToSchedule = 0; + for (int numCont = 0; numCont < toAllocate; numCont++) { + nextNodeToSchedule++; + nextNodeToSchedule %= nodesForScheduling.size(); + NodeId nodeId = nodesForScheduling.get(nextNodeToSchedule); + Container container = buildContainer(rmIdentifier, appParams, idCounter, + anyAsk, id, userName, nodeId); + List cList = containers.get(anyAsk.getCapability()); + if (cList == null) { + cList = new ArrayList<>(); + containers.put(anyAsk.getCapability(), cList); + } + cList.add(container); + numAllocated++; + LOG.info("Allocated [" + container.getId() + "] as opportunistic."); + } + LOG.info("Allocated " + numAllocated + " opportunistic containers."); + } + + private Container buildContainer(long rmIdentifier, + AllocationParams appParams, ContainerIdGenerator idCounter, + ResourceRequest rr, ApplicationAttemptId id, String userName, + NodeId nodeId) throws YarnException { + ContainerId cId = + ContainerId.newContainerId(id, idCounter.generateContainerId()); + + // Normalize the resource asks (Similar to what the the RM scheduler does + // before accepting an ask) + Resource capability = normalizeCapability(appParams, rr); + + long currTime = System.currentTimeMillis(); + ContainerTokenIdentifier containerTokenIdentifier = + new ContainerTokenIdentifier( + cId, nodeId.getHost() + ":" + nodeId.getPort(), userName, + capability, currTime + appParams.containerTokenExpiryInterval, + tokenSecretManager.getCurrentKey().getKeyId(), rmIdentifier, + rr.getPriority(), currTime, + null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK, + ExecutionType.OPPORTUNISTIC); + byte[] pwd = + tokenSecretManager.createPassword(containerTokenIdentifier); + Token containerToken = newContainerToken(nodeId, pwd, + containerTokenIdentifier); + Container container = BuilderUtils.newContainer( + cId, nodeId, nodeId.getHost() + ":" + webpagePort, + capability, rr.getPriority(), containerToken, + containerTokenIdentifier.getExecutionType(), + rr.getAllocationRequestId()); + return container; + } + + private Resource normalizeCapability(AllocationParams appParams, + ResourceRequest ask) { + return Resources.normalize(RESOURCE_CALCULATOR, + ask.getCapability(), appParams.minResource, appParams.maxResource, + appParams.incrementResource); + } + + private static Token newContainerToken(NodeId nodeId, byte[] password, + ContainerTokenIdentifier tokenIdentifier) { + // RPC layer client expects ip:port as service for tokens + InetSocketAddress addr = NetUtils.createSocketAddrForHost(nodeId.getHost(), + nodeId.getPort()); + // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token + Token containerToken = Token.newInstance(tokenIdentifier.getBytes(), + ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil + .buildTokenService(addr).toString()); + return containerToken; + } + + private PartitionedResourceRequests partitionAskList(List + askList) { + PartitionedResourceRequests partitionedRequests = + new PartitionedResourceRequests(); + for (ResourceRequest rr : askList) { + if (rr.getExecutionTypeRequest().getExecutionType() == + ExecutionType.OPPORTUNISTIC) { + partitionedRequests.getOpportunistic().add(rr); + } else { + partitionedRequests.getGuaranteed().add(rr); + } + } + return partitionedRequests; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5f7edb79/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java new file mode 100644 index 0000000..1b701ea --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.scheduler; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.AllocationParams; +import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.ContainerIdGenerator; + +/** + * This encapsulates application specific information used by the + * Opportunistic Container Allocator to allocate containers. + */ +public class OpportunisticContainerContext { + + private static final Logger LOG = LoggerFactory + .getLogger(OpportunisticContainerContext.class); + + // Currently just used to keep track of allocated containers. + // Can be used for reporting stats later. + private Set containersAllocated = new HashSet<>(); + private AllocationParams appParams = + new AllocationParams(); + private ContainerIdGenerator containerIdGenerator = + new ContainerIdGenerator(); + + private Map nodeMap = new LinkedHashMap<>(); + + // Mapping of NodeId to NodeTokens. Populated either from RM response or + // generated locally if required. + private Map nodeTokens = new HashMap<>(); + private final Set blacklist = new HashSet<>(); + + // This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority, + // Resource Name (Host/rack/any) and capability. This mapping is required + // to match a received Container to an outstanding OPPORTUNISTIC + // ResourceRequest (ask). + private final TreeMap> + outstandingOpReqs = new TreeMap<>(); + + public Set getContainersAllocated() { + return containersAllocated; + } + + public OpportunisticContainerAllocator.AllocationParams getAppParams() { + return appParams; + } + + public ContainerIdGenerator getContainerIdGenerator() { + return containerIdGenerator; + } + + public void setContainerIdGenerator( + ContainerIdGenerator containerIdGenerator) { + this.containerIdGenerator = containerIdGenerator; + } + + public Map getNodeMap() { + return nodeMap; + } + + public Map getNodeTokens() { + return nodeTokens; + } + + public Set getBlacklist() { + return blacklist; + } + + public TreeMap> + getOutstandingOpReqs() { + return outstandingOpReqs; + } + + /** + * Takes a list of ResourceRequests (asks), extracts the key information viz. + * (Priority, ResourceName, Capability) and adds to the outstanding + * OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce + * the current YARN constraint that only a single ResourceRequest can exist at + * a give Priority and Capability. + * + * @param resourceAsks the list with the {@link ResourceRequest}s + */ + public void addToOutstandingReqs(List resourceAsks) { + for (ResourceRequest request : resourceAsks) { + Priority priority = request.getPriority(); + + // TODO: Extend for Node/Rack locality. We only handle ANY requests now + if (!ResourceRequest.isAnyLocation(request.getResourceName())) { + continue; + } + + if (request.getNumContainers() == 0) { + continue; + } + + Map reqMap = + outstandingOpReqs.get(priority); + if (reqMap == null) { + reqMap = new HashMap<>(); + outstandingOpReqs.put(priority, reqMap); + } + + ResourceRequest resourceRequest = reqMap.get(request.getCapability()); + if (resourceRequest == null) { + resourceRequest = request; + reqMap.put(request.getCapability(), request); + } else { + resourceRequest.setNumContainers( + resourceRequest.getNumContainers() + request.getNumContainers()); + } + if (ResourceRequest.isAnyLocation(request.getResourceName())) { + LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority + + ", with capability = " + request.getCapability() + " ) : " + + resourceRequest.getNumContainers()); + } + } + } + + /** + * This method matches a returned list of Container Allocations to any + * outstanding OPPORTUNISTIC ResourceRequest. + * @param capability Capability + * @param allocatedContainers Allocated Containers + */ + public void matchAllocationToOutstandingRequest(Resource capability, + List allocatedContainers) { + for (Container c : allocatedContainers) { + containersAllocated.add(c.getId()); + Map asks = + outstandingOpReqs.get(c.getPriority()); + + if (asks == null) { + continue; + } + + ResourceRequest rr = asks.get(capability); + if (rr != null) { + rr.setNumContainers(rr.getNumContainers() - 1); + if (rr.getNumContainers() == 0) { + asks.remove(capability); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5f7edb79/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/package-info.java new file mode 100644 index 0000000..dd56829 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Utility classes used for Scheduling. + */ +package org.apache.hadoop.yarn.server.scheduler; http://git-wip-us.apache.org/repos/asf/hadoop/blob/5f7edb79/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index cfcf1bd..8ef5899 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; -import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator; +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; http://git-wip-us.apache.org/repos/asf/hadoop/blob/5f7edb79/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index f7d226e..84c6eeb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -71,7 +71,7 @@ import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ScriptBasedNodeLabel import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; -import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator; +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer; @@ -327,7 +327,8 @@ public class NodeManager extends CompositeService addService(nodeHealthChecker); boolean isDistSchedulingEnabled = - conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, + conf.getBoolean(YarnConfiguration. + OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT); this.context = createNMContext(containerTokenSecretManager, @@ -361,8 +362,8 @@ public class NodeManager extends CompositeService ((NMContext) context).setWebServer(webServer); ((NMContext) context).setQueueableContainerAllocator( - new OpportunisticContainerAllocator(nodeStatusUpdater, context, - webServer.getPort())); + new OpportunisticContainerAllocator( + context.getContainerTokenSecretManager(), webServer.getPort())); dispatcher.register(ContainerManagerEventType.class, containerManager); dispatcher.register(NodeManagerEventType.class, this); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5f7edb79/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java index 75fe022..efbdfb4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java @@ -62,7 +62,7 @@ public final class DefaultRequestInterceptor extends AbstractRequestInterceptor { private static final Logger LOG = LoggerFactory .getLogger(DefaultRequestInterceptor.class); - private DistributedSchedulingAMProtocol rmClient; + private ApplicationMasterProtocol rmClient; private UserGroupInformation user = null; @Override @@ -76,15 +76,7 @@ public final class DefaultRequestInterceptor extends user.addToken(appContext.getAMRMToken()); final Configuration conf = this.getConf(); - rmClient = user.doAs( - new PrivilegedExceptionAction() { - @Override - public DistributedSchedulingAMProtocol run() throws Exception { - setAMRMTokenService(conf); - return ServerRMProxy.createRMProxy(conf, - DistributedSchedulingAMProtocol.class); - } - }); + rmClient = createRMClient(appContext, conf); } catch (IOException e) { String message = "Error while creating of RM app master service proxy for attemptId:" @@ -100,6 +92,32 @@ public final class DefaultRequestInterceptor extends } } + private ApplicationMasterProtocol createRMClient( + AMRMProxyApplicationContext appContext, final Configuration conf) + throws IOException, InterruptedException { + if (appContext.getNMCotext().isDistributedSchedulingEnabled()) { + return user.doAs( + new PrivilegedExceptionAction() { + @Override + public DistributedSchedulingAMProtocol run() throws Exception { + setAMRMTokenService(conf); + return ServerRMProxy.createRMProxy(conf, + DistributedSchedulingAMProtocol.class); + } + }); + } else { + return user.doAs( + new PrivilegedExceptionAction() { + @Override + public ApplicationMasterProtocol run() throws Exception { + setAMRMTokenService(conf); + return ClientRMProxy.createRMProxy(conf, + ApplicationMasterProtocol.class); + } + }); + } + } + @Override public RegisterApplicationMasterResponse registerApplicationMaster( final RegisterApplicationMasterRequest request) @@ -127,9 +145,15 @@ public final class DefaultRequestInterceptor extends registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequest request) throws YarnException, IOException { - LOG.info("Forwarding registerApplicationMasterForDistributedScheduling" + - "request to the real YARN RM"); - return rmClient.registerApplicationMasterForDistributedScheduling(request); + if (getApplicationContext().getNMCotext() + .isDistributedSchedulingEnabled()) { + LOG.info("Forwarding registerApplicationMasterForDistributedScheduling" + + "request to the real YARN RM"); + return ((DistributedSchedulingAMProtocol)rmClient) + .registerApplicationMasterForDistributedScheduling(request); + } else { + throw new YarnException("Distributed Scheduling is not enabled !!"); + } } @Override @@ -140,13 +164,18 @@ public final class DefaultRequestInterceptor extends LOG.debug("Forwarding allocateForDistributedScheduling request" + "to the real YARN RM"); } - DistributedSchedulingAllocateResponse allocateResponse = - rmClient.allocateForDistributedScheduling(request); - if (allocateResponse.getAllocateResponse().getAMRMToken() != null) { - updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken()); + if (getApplicationContext().getNMCotext() + .isDistributedSchedulingEnabled()) { + DistributedSchedulingAllocateResponse allocateResponse = + ((DistributedSchedulingAMProtocol)rmClient) + .allocateForDistributedScheduling(request); + if (allocateResponse.getAllocateResponse().getAMRMToken() != null) { + updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken()); + } + return allocateResponse; + } else { + throw new YarnException("Distributed Scheduling is not enabled !!"); } - - return allocateResponse; } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/5f7edb79/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java index bfb12ee..368858c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java @@ -32,34 +32,23 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; -import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyApplicationContext; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AbstractRequestInterceptor; - - - import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; + +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; /** *

The DistributedScheduler runs on the NodeManager and is modeled as an @@ -76,74 +65,49 @@ import java.util.TreeMap; */ public final class DistributedScheduler extends AbstractRequestInterceptor { - static class PartitionedResourceRequests { - private List guaranteed = new ArrayList<>(); - private List opportunistic = new ArrayList<>(); - public List getGuaranteed() { - return guaranteed; - } - public List getOpportunistic() { - return opportunistic; - } - } - - static class DistributedSchedulerParams { - Resource maxResource; - Resource minResource; - Resource incrementResource; - int containerTokenExpiryInterval; - } - private static final Logger LOG = LoggerFactory .getLogger(DistributedScheduler.class); private final static RecordFactory RECORD_FACTORY = RecordFactoryProvider.getRecordFactory(null); - // Currently just used to keep track of allocated containers. - // Can be used for reporting stats later. - private Set containersAllocated = new HashSet<>(); - - private DistributedSchedulerParams appParams = - new DistributedSchedulerParams(); - private final OpportunisticContainerAllocator.ContainerIdCounter - containerIdCounter = - new OpportunisticContainerAllocator.ContainerIdCounter(); - private Map nodeList = new LinkedHashMap<>(); - - // Mapping of NodeId to NodeTokens. Populated either from RM response or - // generated locally if required. - private Map nodeTokens = new HashMap<>(); - final Set blacklist = new HashSet<>(); - - // This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority, - // Resource Name (Host/rack/any) and capability. This mapping is required - // to match a received Container to an outstanding OPPORTUNISTIC - // ResourceRequest (ask). - final TreeMap> - outstandingOpReqs = new TreeMap<>(); + private OpportunisticContainerContext oppContainerContext = + new OpportunisticContainerContext(); private ApplicationAttemptId applicationAttemptId; private OpportunisticContainerAllocator containerAllocator; private NMTokenSecretManagerInNM nmSecretManager; private String appSubmitter; - - public void init(AMRMProxyApplicationContext appContext) { - super.init(appContext); - initLocal(appContext.getApplicationAttemptId(), - appContext.getNMCotext().getContainerAllocator(), - appContext.getNMCotext().getNMTokenSecretManager(), - appContext.getUser()); + private long rmIdentifier; + + public void init(AMRMProxyApplicationContext applicationContext) { + super.init(applicationContext); + initLocal(applicationContext.getNMCotext().getNodeStatusUpdater() + .getRMIdentifier(), + applicationContext.getApplicationAttemptId(), + applicationContext.getNMCotext().getContainerAllocator(), + applicationContext.getNMCotext().getNMTokenSecretManager(), + applicationContext.getUser()); } @VisibleForTesting - void initLocal(ApplicationAttemptId applicationAttemptId, - OpportunisticContainerAllocator containerAllocator, + void initLocal(long rmId, ApplicationAttemptId appAttemptId, + OpportunisticContainerAllocator oppContainerAllocator, NMTokenSecretManagerInNM nmSecretManager, String appSubmitter) { - this.applicationAttemptId = applicationAttemptId; - this.containerAllocator = containerAllocator; + this.rmIdentifier = rmId; + this.applicationAttemptId = appAttemptId; + this.containerAllocator = oppContainerAllocator; this.nmSecretManager = nmSecretManager; this.appSubmitter = appSubmitter; + + // Overrides the Generator to decrement container id. + this.oppContainerContext.setContainerIdGenerator( + new OpportunisticContainerAllocator.ContainerIdGenerator() { + @Override + public long generateContainerId() { + return this.containerIdCounter.decrementAndGet(); + } + }); } /** @@ -202,7 +166,8 @@ public final class DistributedScheduler extends AbstractRequestInterceptor { if (allocatedContainers.size() > 0) { response.getAllocatedContainers().addAll(allocatedContainers); for (Container alloc : allocatedContainers) { - if (!nodeTokens.containsKey(alloc.getNodeId())) { + if (!oppContainerContext.getNodeTokens().containsKey( + alloc.getNodeId())) { newTokens.add(nmSecretManager.generateNMToken(appSubmitter, alloc)); } } @@ -212,115 +177,34 @@ public final class DistributedScheduler extends AbstractRequestInterceptor { } } - private PartitionedResourceRequests partitionAskList(List - askList) { - PartitionedResourceRequests partitionedRequests = - new PartitionedResourceRequests(); - for (ResourceRequest rr : askList) { - if (rr.getExecutionTypeRequest().getExecutionType() == - ExecutionType.OPPORTUNISTIC) { - partitionedRequests.getOpportunistic().add(rr); - } else { - partitionedRequests.getGuaranteed().add(rr); - } - } - return partitionedRequests; - } - private void updateParameters( RegisterDistributedSchedulingAMResponse registerResponse) { - appParams.minResource = registerResponse.getMinContainerResource(); - appParams.maxResource = registerResponse.getMaxContainerResource(); - appParams.incrementResource = - registerResponse.getIncrContainerResource(); - if (appParams.incrementResource == null) { - appParams.incrementResource = appParams.minResource; + oppContainerContext.getAppParams().setMinResource( + registerResponse.getMinContainerResource()); + oppContainerContext.getAppParams().setMaxResource( + registerResponse.getMaxContainerResource()); + oppContainerContext.getAppParams().setIncrementResource( + registerResponse.getIncrContainerResource()); + if (oppContainerContext.getAppParams().getIncrementResource() == null) { + oppContainerContext.getAppParams().setIncrementResource( + oppContainerContext.getAppParams().getMinResource()); } - appParams.containerTokenExpiryInterval = registerResponse - .getContainerTokenExpiryInterval(); + oppContainerContext.getAppParams().setContainerTokenExpiryInterval( + registerResponse.getContainerTokenExpiryInterval()); - containerIdCounter + oppContainerContext.getContainerIdGenerator() .resetContainerIdCounter(registerResponse.getContainerIdStart()); setNodeList(registerResponse.getNodesForScheduling()); } - /** - * Takes a list of ResourceRequests (asks), extracts the key information viz. - * (Priority, ResourceName, Capability) and adds to the outstanding - * OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce - * the current YARN constraint that only a single ResourceRequest can exist at - * a give Priority and Capability. - * - * @param resourceAsks the list with the {@link ResourceRequest}s - */ - public void addToOutstandingReqs(List resourceAsks) { - for (ResourceRequest request : resourceAsks) { - Priority priority = request.getPriority(); - - // TODO: Extend for Node/Rack locality. We only handle ANY requests now - if (!ResourceRequest.isAnyLocation(request.getResourceName())) { - continue; - } - - if (request.getNumContainers() == 0) { - continue; - } - - Map reqMap = - this.outstandingOpReqs.get(priority); - if (reqMap == null) { - reqMap = new HashMap<>(); - this.outstandingOpReqs.put(priority, reqMap); - } - - ResourceRequest resourceRequest = reqMap.get(request.getCapability()); - if (resourceRequest == null) { - resourceRequest = request; - reqMap.put(request.getCapability(), request); - } else { - resourceRequest.setNumContainers( - resourceRequest.getNumContainers() + request.getNumContainers()); - } - if (ResourceRequest.isAnyLocation(request.getResourceName())) { - LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority - + ", with capability = " + request.getCapability() + " ) : " - + resourceRequest.getNumContainers()); - } - } - } - - /** - * This method matches a returned list of Container Allocations to any - * outstanding OPPORTUNISTIC ResourceRequest. - */ - private void matchAllocationToOutstandingRequest(Resource capability, - List allocatedContainers) { - for (Container c : allocatedContainers) { - containersAllocated.add(c.getId()); - Map asks = - outstandingOpReqs.get(c.getPriority()); - - if (asks == null) - continue; - - ResourceRequest rr = asks.get(capability); - if (rr != null) { - rr.setNumContainers(rr.getNumContainers() - 1); - if (rr.getNumContainers() == 0) { - asks.remove(capability); - } - } - } - } - private void setNodeList(List nodeList) { - this.nodeList.clear(); + oppContainerContext.getNodeMap().clear(); addToNodeList(nodeList); } private void addToNodeList(List nodes) { for (NodeId n : nodes) { - this.nodeList.put(n.getHost(), n); + oppContainerContext.getNodeMap().put(n.getHost(), n); } } @@ -345,52 +229,13 @@ public final class DistributedScheduler extends AbstractRequestInterceptor { LOG.debug("Forwarding allocate request to the" + "Distributed Scheduler Service on YARN RM"); } - // Partition requests into GUARANTEED and OPPORTUNISTIC reqs - PartitionedResourceRequests partitionedAsks = - partitionAskList(request.getAllocateRequest().getAskList()); - - List releasedContainers = - request.getAllocateRequest().getReleaseList(); - int numReleasedContainers = releasedContainers.size(); - if (numReleasedContainers > 0) { - LOG.info("AttemptID: " + applicationAttemptId + " released: " - + numReleasedContainers); - containersAllocated.removeAll(releasedContainers); - } - - // Also, update black list - ResourceBlacklistRequest rbr = - request.getAllocateRequest().getResourceBlacklistRequest(); - if (rbr != null) { - blacklist.removeAll(rbr.getBlacklistRemovals()); - blacklist.addAll(rbr.getBlacklistAdditions()); - } - - // Add OPPORTUNISTIC reqs to the outstanding reqs - addToOutstandingReqs(partitionedAsks.getOpportunistic()); - - List allocatedContainers = new ArrayList<>(); - for (Priority priority : outstandingOpReqs.descendingKeySet()) { - // Allocated containers : - // Key = Requested Capability, - // Value = List of Containers of given Cap (The actual container size - // might be different than what is requested.. which is why - // we need the requested capability (key) to match against - // the outstanding reqs) - Map> allocated = - containerAllocator.allocate(this.appParams, containerIdCounter, - outstandingOpReqs.get(priority).values(), blacklist, - applicationAttemptId, nodeList, appSubmitter); - for (Map.Entry> e : allocated.entrySet()) { - matchAllocationToOutstandingRequest(e.getKey(), e.getValue()); - allocatedContainers.addAll(e.getValue()); - } - } + List allocatedContainers = + containerAllocator.allocateContainers( + request.getAllocateRequest(), applicationAttemptId, + oppContainerContext, rmIdentifier, appSubmitter); request.setAllocatedContainers(allocatedContainers); - // Send all the GUARANTEED Reqs to RM - request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed()); DistributedSchedulingAllocateResponse dsResp = getNextInterceptor().allocateForDistributedScheduling(request); @@ -398,7 +243,7 @@ public final class DistributedScheduler extends AbstractRequestInterceptor { setNodeList(dsResp.getNodesForScheduling()); List nmTokens = dsResp.getAllocateResponse().getNMTokens(); for (NMToken nmToken : nmTokens) { - nodeTokens.put(nmToken.getNodeId(), nmToken); + oppContainerContext.getNodeTokens().put(nmToken.getNodeId(), nmToken); } List completedContainers = @@ -407,7 +252,8 @@ public final class DistributedScheduler extends AbstractRequestInterceptor { // Only account for opportunistic containers for (ContainerStatus cs : completedContainers) { if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) { - containersAllocated.remove(cs.getContainerId()); + oppContainerContext.getContainersAllocated() + .remove(cs.getContainerId()); } } @@ -417,9 +263,9 @@ public final class DistributedScheduler extends AbstractRequestInterceptor { dsResp.getAllocateResponse(), nmTokens, allocatedContainers); if (LOG.isDebugEnabled()) { - LOG.debug( - "Number of opportunistic containers currently allocated by" + - "application: " + containersAllocated.size()); + LOG.debug("Number of opportunistic containers currently" + + "allocated by application: " + oppContainerContext + .getContainersAllocated().size()); } return dsResp; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5f7edb79/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java deleted file mode 100644 index 4723233..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java +++ /dev/null @@ -1,190 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.nodemanager.scheduler; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.yarn.api.records.*; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; -import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; -import org.apache.hadoop.yarn.server.api.ContainerType; -import org.apache.hadoop.yarn.server.nodemanager.Context; -import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; -import org.apache.hadoop.yarn.server.nodemanager.scheduler.DistributedScheduler.DistributedSchedulerParams; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import org.apache.hadoop.yarn.util.resource.Resources; - -import java.net.InetSocketAddress; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.atomic.AtomicLong; - -/** - *

- * The OpportunisticContainerAllocator allocates containers on a given list of - * nodes, after modifying the container sizes to respect the limits set by the - * ResourceManager. It tries to distribute the containers as evenly as possible. - * It also uses the NMTokenSecretManagerInNM to generate the - * required NM tokens for the allocated containers. - *

- */ -public class OpportunisticContainerAllocator { - - private static final Log LOG = - LogFactory.getLog(OpportunisticContainerAllocator.class); - - private static final ResourceCalculator RESOURCE_CALCULATOR = - new DominantResourceCalculator(); - - static class ContainerIdCounter { - final AtomicLong containerIdCounter = new AtomicLong(1); - - void resetContainerIdCounter(long containerIdStart) { - this.containerIdCounter.set(containerIdStart); - } - - long generateContainerId() { - return this.containerIdCounter.decrementAndGet(); - } - } - - private final NodeStatusUpdater nodeStatusUpdater; - private final Context context; - private int webpagePort; - - public OpportunisticContainerAllocator(NodeStatusUpdater nodeStatusUpdater, - Context context, int webpagePort) { - this.nodeStatusUpdater = nodeStatusUpdater; - this.context = context; - this.webpagePort = webpagePort; - } - - public Map> allocate( - DistributedSchedulerParams appParams, ContainerIdCounter idCounter, - Collection resourceAsks, Set blacklist, - ApplicationAttemptId appAttId, Map allNodes, - String userName) throws YarnException { - Map> containers = new HashMap<>(); - for (ResourceRequest anyAsk : resourceAsks) { - allocateOpportunisticContainers(appParams, idCounter, blacklist, appAttId, - allNodes, userName, containers, anyAsk); - LOG.info("Opportunistic allocation requested for [" - + "priority=" + anyAsk.getPriority() - + ", num_containers=" + anyAsk.getNumContainers() - + ", capability=" + anyAsk.getCapability() + "]" - + " allocated = " + containers.get(anyAsk.getCapability()).size()); - } - return containers; - } - - private void allocateOpportunisticContainers( - DistributedSchedulerParams appParams, ContainerIdCounter idCounter, - Set blacklist, ApplicationAttemptId id, - Map allNodes, String userName, - Map> containers, ResourceRequest anyAsk) - throws YarnException { - int toAllocate = anyAsk.getNumContainers() - - (containers.isEmpty() ? 0 : - containers.get(anyAsk.getCapability()).size()); - - List nodesForScheduling = new ArrayList<>(); - for (Entry nodeEntry : allNodes.entrySet()) { - // Do not use blacklisted nodes for scheduling. - if (blacklist.contains(nodeEntry.getKey())) { - continue; - } - nodesForScheduling.add(nodeEntry.getValue()); - } - int numAllocated = 0; - int nextNodeToSchedule = 0; - for (int numCont = 0; numCont < toAllocate; numCont++) { - nextNodeToSchedule++; - nextNodeToSchedule %= nodesForScheduling.size(); - NodeId nodeId = nodesForScheduling.get(nextNodeToSchedule); - Container container = buildContainer(appParams, idCounter, anyAsk, id, - userName, nodeId); - List cList = containers.get(anyAsk.getCapability()); - if (cList == null) { - cList = new ArrayList<>(); - containers.put(anyAsk.getCapability(), cList); - } - cList.add(container); - numAllocated++; - LOG.info("Allocated [" + container.getId() + "] as opportunistic."); - } - LOG.info("Allocated " + numAllocated + " opportunistic containers."); - } - - private Container buildContainer(DistributedSchedulerParams appParams, - ContainerIdCounter idCounter, ResourceRequest rr, ApplicationAttemptId id, - String userName, NodeId nodeId) throws YarnException { - ContainerId cId = - ContainerId.newContainerId(id, idCounter.generateContainerId()); - - // Normalize the resource asks (Similar to what the the RM scheduler does - // before accepting an ask) - Resource capability = normalizeCapability(appParams, rr); - - long currTime = System.currentTimeMillis(); - ContainerTokenIdentifier containerTokenIdentifier = - new ContainerTokenIdentifier( - cId, nodeId.getHost() + ":" + nodeId.getPort(), userName, - capability, currTime + appParams.containerTokenExpiryInterval, - context.getContainerTokenSecretManager().getCurrentKey().getKeyId(), - nodeStatusUpdater.getRMIdentifier(), rr.getPriority(), currTime, - null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK, - ExecutionType.OPPORTUNISTIC); - byte[] pwd = - context.getContainerTokenSecretManager().createPassword( - containerTokenIdentifier); - Token containerToken = newContainerToken(nodeId, pwd, - containerTokenIdentifier); - Container container = BuilderUtils.newContainer( - cId, nodeId, nodeId.getHost() + ":" + webpagePort, - capability, rr.getPriority(), containerToken, - containerTokenIdentifier.getExecutionType(), - rr.getAllocationRequestId()); - return container; - } - - private Resource normalizeCapability(DistributedSchedulerParams appParams, - ResourceRequest ask) { - return Resources.normalize(RESOURCE_CALCULATOR, - ask.getCapability(), appParams.minResource, appParams.maxResource, - appParams.incrementResource); - } - - public static Token newContainerToken(NodeId nodeId, byte[] password, - ContainerTokenIdentifier tokenIdentifier) { - // RPC layer client expects ip:port as service for tokens - InetSocketAddress addr = NetUtils.createSocketAddrForHost(nodeId.getHost(), - nodeId.getPort()); - // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token - Token containerToken = Token.newInstance(tokenIdentifier.getBytes(), - ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil - .buildTokenService(addr).toString()); - return containerToken; - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5f7edb79/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 4f726d4..a41f865 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -67,7 +67,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; -import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator; +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; http://git-wip-us.apache.org/repos/asf/hadoop/blob/5f7edb79/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java index b093b3b..8f1ae7f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java @@ -38,11 +38,12 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAl import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse; import org.apache.hadoop.yarn.server.api.records.MasterKey; -import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; + +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; @@ -189,7 +190,6 @@ public class TestDistributedScheduler { DistributedScheduler distributedScheduler) { NodeStatusUpdater nodeStatusUpdater = Mockito.mock(NodeStatusUpdater.class); Mockito.when(nodeStatusUpdater.getRMIdentifier()).thenReturn(12345l); - Context context = Mockito.mock(Context.class); NMContainerTokenSecretManager nmContainerTokenSecretManager = new NMContainerTokenSecretManager(conf); MasterKey mKey = new MasterKey() { @@ -207,15 +207,13 @@ public class TestDistributedScheduler { public void setBytes(ByteBuffer bytes) {} }; nmContainerTokenSecretManager.setMasterKey(mKey); - Mockito.when(context.getContainerTokenSecretManager()).thenReturn - (nmContainerTokenSecretManager); OpportunisticContainerAllocator containerAllocator = - new OpportunisticContainerAllocator(nodeStatusUpdater, context, 7777); + new OpportunisticContainerAllocator(nmContainerTokenSecretManager, 77); NMTokenSecretManagerInNM nmTokenSecretManagerInNM = new NMTokenSecretManagerInNM(); nmTokenSecretManagerInNM.setMasterKey(mKey); - distributedScheduler.initLocal( + distributedScheduler.initLocal(1234, ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1), containerAllocator, nmTokenSecretManagerInNM, "test"); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org