Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9F46B200D08 for ; Wed, 23 Aug 2017 01:16:02 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9D9D6167C27; Tue, 22 Aug 2017 23:16:02 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4CB0E167BA7 for ; Wed, 23 Aug 2017 01:16:00 +0200 (CEST) Received: (qmail 32261 invoked by uid 500); 22 Aug 2017 23:15:38 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 29811 invoked by uid 99); 22 Aug 2017 23:15:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Aug 2017 23:15:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 058CCF5590; Tue, 22 Aug 2017 23:15:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wangda@apache.org To: common-commits@hadoop.apache.org Date: Tue, 22 Aug 2017 23:16:02 -0000 Message-Id: In-Reply-To: <8e0831b00e74463fa1901c9afe932257@git.apache.org> References: <8e0831b00e74463fa1901c9afe932257@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [28/42] hadoop git commit: YARN-5587. Add support for resource profiles. (vvasudev via asuresh) archived-at: Tue, 22 Aug 2017 23:16:02 -0000 YARN-5587. Add support for resource profiles. (vvasudev via asuresh) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/02fb8fb4 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/02fb8fb4 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/02fb8fb4 Branch: refs/heads/YARN-3926 Commit: 02fb8fb44276b46602102a862199fdce06157f29 Parents: 9ae6c06 Author: Arun Suresh Authored: Tue Nov 15 01:01:07 2016 -0800 Committer: Wangda Tan Committed: Tue Aug 22 16:05:18 2017 -0700 ---------------------------------------------------------------------- .../dev-support/findbugs-exclude.xml | 4 + .../RegisterApplicationMasterResponse.java | 8 + .../yarn/api/records/ProfileCapability.java | 94 ++++++++++- .../hadoop/yarn/api/records/Resource.java | 14 ++ .../yarn/api/records/ResourceInformation.java | 57 ++++++- .../yarn/api/records/ResourceRequest.java | 43 ++++- .../hadoop-yarn/hadoop-yarn-client/pom.xml | 1 + .../hadoop/yarn/client/api/AMRMClient.java | 117 +++++++++++++- .../yarn/client/api/impl/AMRMClientImpl.java | 152 ++++++++++------- .../client/api/impl/RemoteRequestsTable.java | 109 +++++++++---- .../yarn/client/api/impl/TestAMRMClient.java | 141 ++++++++++++++-- .../impl/TestAMRMClientContainerRequest.java | 8 +- .../api/impl/TestDistributedScheduling.java | 12 +- .../yarn/client/api/impl/TestNMClient.java | 5 +- .../TestOpportunisticContainerAllocation.java | 31 ++-- .../src/test/resources/resource-profiles.json | 18 +++ ...RegisterApplicationMasterResponsePBImpl.java | 58 +++++++ .../api/records/impl/pb/ResourcePBImpl.java | 4 +- .../records/impl/pb/ResourceRequestPBImpl.java | 41 ++++- .../yarn/util/resource/ResourceUtils.java | 161 ++++++++++++++++++- .../hadoop/yarn/util/resource/Resources.java | 10 +- .../ApplicationMasterService.java | 1 + .../resourcemanager/DefaultAMSProcessor.java | 8 + .../server/resourcemanager/RMServerUtils.java | 50 ++++++ .../resource/ResourceProfilesManagerImpl.java | 4 + .../scheduler/AbstractYarnScheduler.java | 44 +++++ .../scheduler/ClusterNodeTracker.java | 3 +- .../scheduler/SchedulerUtils.java | 10 ++ .../scheduler/capacity/CapacityScheduler.java | 4 +- .../scheduler/fair/FairScheduler.java | 4 +- .../scheduler/fifo/FifoScheduler.java | 13 +- .../yarn/server/resourcemanager/MockRM.java | 2 + .../server/resourcemanager/TestAppManager.java | 1 + .../TestApplicationMasterService.java | 35 ++++ .../scheduler/fair/TestFairScheduler.java | 4 + .../hadoop/yarn/server/MiniYARNCluster.java | 2 + 36 files changed, 1100 insertions(+), 173 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/02fb8fb4/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 6825a36..ce7a9c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -154,6 +154,10 @@ + + + + http://git-wip-us.apache.org/repos/asf/hadoop/blob/02fb8fb4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java index 0b886dd..8fa8563 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java @@ -204,4 +204,12 @@ public abstract class RegisterApplicationMasterResponse { @Unstable public abstract void setSchedulerResourceTypes( EnumSet types); + + @Public + @Unstable + public abstract Map getResourceProfiles(); + + @Private + @Unstable + public abstract void setResourceProfiles(Map profiles); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/02fb8fb4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java index 0a93b89..faaddd5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java @@ -18,41 +18,93 @@ package org.apache.hadoop.yarn.api.records; +import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.yarn.util.Records; +import java.util.Map; + /** * Class to capture capability requirements when using resource profiles. The * ProfileCapability is meant to be used as part of the ResourceRequest. A * profile capability has two pieces - the resource profile name and the * overrides. The resource profile specifies the name of the resource profile * to be used and the capability override is the overrides desired on specific - * resource types. For example, you could use the "minimum" profile and set the - * memory in the capability override to 4096M. This implies that you wish for - * the resources specified in the "minimum" profile but with 4096M memory. The - * conversion from the ProfileCapability to the Resource class with the actual - * resource requirements will be done by the ResourceManager, which has the - * actual profile to Resource mapping. + * resource types. + * + * For example, if you have a resource profile "small" that maps to + * {@literal <4096M, 2 cores, 1 gpu>} and you set the capability override to + * {@literal <8192M, 0 cores, 0 gpu>}, then the actual resource allocation on + * the ResourceManager will be {@literal <8192M, 2 cores, 1 gpu>}. + * + * Note that the conversion from the ProfileCapability to the Resource class + * with the actual resource requirements will be done by the ResourceManager, + * which has the actual profile to Resource mapping. + * */ @InterfaceAudience.Public @InterfaceStability.Unstable public abstract class ProfileCapability { + public static final String DEFAULT_PROFILE = "default"; + + public static ProfileCapability newInstance(Resource override) { + return newInstance(DEFAULT_PROFILE, override); + } + + public static ProfileCapability newInstance(String profile) { + Preconditions + .checkArgument(profile != null, "The profile name cannot be null"); + ProfileCapability obj = Records.newRecord(ProfileCapability.class); + obj.setProfileName(profile); + obj.setProfileCapabilityOverride(Resource.newInstance(0, 0)); + return obj; + } + public static ProfileCapability newInstance(String profile, Resource override) { + Preconditions + .checkArgument(profile != null, "The profile name cannot be null"); ProfileCapability obj = Records.newRecord(ProfileCapability.class); obj.setProfileName(profile); obj.setProfileCapabilityOverride(override); return obj; } + /** + * Get the profile name. + * @return the profile name + */ public abstract String getProfileName(); + /** + * Get the profile capability override. + * @return Resource object containing the override. + */ public abstract Resource getProfileCapabilityOverride(); + /** + * Set the resource profile name. + * @param profileName the resource profile name + */ public abstract void setProfileName(String profileName); + /** + * Set the capability override to override specific resource types on the + * resource profile. + * + * For example, if you have a resource profile "small" that maps to + * {@literal <4096M, 2 cores, 1 gpu>} and you set the capability override to + * {@literal <8192M, 0 cores, 0 gpu>}, then the actual resource allocation on + * the ResourceManager will be {@literal <8192M, 2 cores, 1 gpu>}. + * + * Note that the conversion from the ProfileCapability to the Resource class + * with the actual resource requirements will be done by the ResourceManager, + * which has the actual profile to Resource mapping. + * + * @param r Resource object containing the capability override + */ public abstract void setProfileCapabilityOverride(Resource r); @Override @@ -85,4 +137,34 @@ public abstract class ProfileCapability { return "{ profile: " + this.getProfileName() + ", capabilityOverride: " + this.getProfileCapabilityOverride() + " }"; } + + /** + * Get a representation of the capability as a Resource object. + * @param capability the capability we wish to convert + * @param resourceProfilesMap map of profile name to Resource object + * @return Resource object representing the capability + */ + public static Resource toResource(ProfileCapability capability, + Map resourceProfilesMap) { + Preconditions + .checkArgument(capability != null, "Capability cannot be null"); + Preconditions.checkArgument(resourceProfilesMap != null, + "Resource profiles map cannot be null"); + Resource resource = Resource.newInstance(0, 0); + + if (resourceProfilesMap.containsKey(capability.getProfileName())) { + resource = Resource + .newInstance(resourceProfilesMap.get(capability.getProfileName())); + } + + if(capability.getProfileCapabilityOverride()!= null) { + for (Map.Entry entry : capability + .getProfileCapabilityOverride().getResources().entrySet()) { + if (entry.getValue() != null && entry.getValue().getValue() != 0) { + resource.setResourceInformation(entry.getKey(), entry.getValue()); + } + } + } + return resource; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/02fb8fb4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java index 507247e..c349a32 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java @@ -19,7 +19,9 @@ package org.apache.hadoop.yarn.api.records; import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; @@ -101,6 +103,18 @@ public abstract class Resource implements Comparable { return new SimpleResource(memory, vCores); } + @InterfaceAudience.Private + @InterfaceStability.Unstable + public static Resource newInstance(Resource resource) { + Resource ret = Resource.newInstance(0, 0); + for (Map.Entry entry : resource.getResources() + .entrySet()) { + ret.setResourceInformation(entry.getKey(), + ResourceInformation.newInstance(entry.getValue())); + } + return ret; + } + /** * This method is DEPRECATED: * Use {@link Resource#getMemorySize()} instead http://git-wip-us.apache.org/repos/asf/hadoop/blob/02fb8fb4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java index a17e81b..7d74efc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java @@ -31,6 +31,8 @@ public class ResourceInformation implements Comparable { private String units; private ResourceTypes resourceType; private Long value; + private Long minimumAllocation; + private Long maximumAllocation; private static final String MEMORY_URI = "memory-mb"; private static final String VCORES_URI = "vcores"; @@ -118,6 +120,42 @@ public class ResourceInformation implements Comparable { } /** + * Get the minimum allocation for the resource. + * + * @return the minimum allocation for the resource + */ + public Long getMinimumAllocation() { + return minimumAllocation; + } + + /** + * Set the minimum allocation for the resource. + * + * @param minimumAllocation the minimum allocation for the resource + */ + public void setMinimumAllocation(Long minimumAllocation) { + this.minimumAllocation = minimumAllocation; + } + + /** + * Get the maximum allocation for the resource. + * + * @return the maximum allocation for the resource + */ + public Long getMaximumAllocation() { + return maximumAllocation; + } + + /** + * Set the maximum allocation for the resource. + * + * @param maximumAllocation the maximum allocation for the resource + */ + public void setMaximumAllocation(Long maximumAllocation) { + this.maximumAllocation = maximumAllocation; + } + + /** * Create a new instance of ResourceInformation from another object. * * @param other the object from which the new object should be created @@ -129,33 +167,41 @@ public class ResourceInformation implements Comparable { ret.setResourceType(other.getResourceType()); ret.setUnits(other.getUnits()); ret.setValue(other.getValue()); + ret.setMinimumAllocation(other.getMinimumAllocation()); + ret.setMaximumAllocation(other.getMaximumAllocation()); return ret; } public static ResourceInformation newInstance(String name, String units, - Long value, ResourceTypes type) { + Long value, ResourceTypes type, Long minimumAllocation, + Long maximumAllocation) { ResourceInformation ret = new ResourceInformation(); ret.setName(name); ret.setResourceType(type); ret.setUnits(units); ret.setValue(value); + ret.setMinimumAllocation(minimumAllocation); + ret.setMaximumAllocation(maximumAllocation); return ret; } public static ResourceInformation newInstance(String name, String units, Long value) { return ResourceInformation - .newInstance(name, units, value, ResourceTypes.COUNTABLE); + .newInstance(name, units, value, ResourceTypes.COUNTABLE, 0L, + Long.MAX_VALUE); } public static ResourceInformation newInstance(String name, String units) { return ResourceInformation - .newInstance(name, units, 0L, ResourceTypes.COUNTABLE); + .newInstance(name, units, 0L, ResourceTypes.COUNTABLE, 0L, + Long.MAX_VALUE); } public static ResourceInformation newInstance(String name, Long value) { return ResourceInformation - .newInstance(name, "", value, ResourceTypes.COUNTABLE); + .newInstance(name, "", value, ResourceTypes.COUNTABLE, 0L, + Long.MAX_VALUE); } public static ResourceInformation newInstance(String name) { @@ -165,7 +211,8 @@ public class ResourceInformation implements Comparable { @Override public String toString() { return "name: " + this.name + ", units: " + this.units + ", type: " - + resourceType + ", value: " + value; + + resourceType + ", value: " + value + ", minimum allocation: " + + minimumAllocation + ", maximum allocation: " + maximumAllocation; } public String getShorthandRepresentation() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/02fb8fb4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java index 5bedc87..c1339b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java @@ -98,7 +98,22 @@ public abstract class ResourceRequest implements Comparable { .resourceName(hostName).capability(capability) .numContainers(numContainers).relaxLocality(relaxLocality) .nodeLabelExpression(labelExpression) - .executionTypeRequest(executionTypeRequest).build(); + .executionTypeRequest(executionTypeRequest).profileCapability(null) + .build(); + } + + @Public + @Evolving + public static ResourceRequest newInstance(Priority priority, String hostName, + Resource capability, int numContainers, boolean relaxLocality, + String labelExpression, ExecutionTypeRequest executionTypeRequest, + ProfileCapability profile) { + return ResourceRequest.newBuilder().priority(priority) + .resourceName(hostName).capability(capability) + .numContainers(numContainers).relaxLocality(relaxLocality) + .nodeLabelExpression(labelExpression) + .executionTypeRequest(executionTypeRequest).profileCapability(profile) + .build(); } @Public @@ -124,6 +139,7 @@ public abstract class ResourceRequest implements Comparable { resourceRequest.setRelaxLocality(true); resourceRequest.setExecutionTypeRequest( ExecutionTypeRequest.newInstance()); + resourceRequest.setProfileCapability(null); } /** @@ -238,6 +254,21 @@ public abstract class ResourceRequest implements Comparable { } /** + * Set the resourceProfile of the request. + * @see ResourceRequest#setProfileCapability(ProfileCapability) + * @param profileCapability + * profileCapability of the request + * @return {@link ResourceRequestBuilder} + */ + @Public + @Evolving + public ResourceRequestBuilder profileCapability( + ProfileCapability profileCapability) { + resourceRequest.setProfileCapability(profileCapability); + return this; + } + + /** * Return generated {@link ResourceRequest} object. * @return {@link ResourceRequest} */ @@ -454,6 +485,14 @@ public abstract class ResourceRequest implements Comparable { @Evolving public abstract void setNodeLabelExpression(String nodelabelExpression); + @Public + @Evolving + public abstract ProfileCapability getProfileCapability(); + + @Public + @Evolving + public abstract void setProfileCapability(ProfileCapability p); + /** * Get the optional ID corresponding to this allocation request. This * ID is an identifier for different {@code ResourceRequest}s from the same @@ -529,12 +568,14 @@ public abstract class ResourceRequest implements Comparable { Resource capability = getCapability(); String hostName = getResourceName(); Priority priority = getPriority(); + ProfileCapability profile = getProfileCapability(); result = prime * result + ((capability == null) ? 0 : capability.hashCode()); result = prime * result + ((hostName == null) ? 0 : hostName.hashCode()); result = prime * result + getNumContainers(); result = prime * result + ((priority == null) ? 0 : priority.hashCode()); result = prime * result + Long.valueOf(getAllocationRequestId()).hashCode(); + result = prime * result + ((profile == null) ? 0 : profile.hashCode()); return result; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/02fb8fb4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml index b83bff8..8cbf4c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml @@ -144,6 +144,7 @@ src/test/resources/application_1440536969523_0001.har/part-0 src/test/resources/application_1440536969523_0001.har/_masterindex src/test/resources/application_1440536969523_0001.har/_SUCCESS + src/test/resources/resource-profiles.json http://git-wip-us.apache.org/repos/asf/hadoop/blob/02fb8fb4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index 69f3777..a11275b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; @@ -125,6 +126,7 @@ public abstract class AMRMClient extends private boolean relaxLocality; private String nodeLabelsExpression; private ExecutionTypeRequest executionTypeRequest; + private String resourceProfile; /** * Instantiates a {@link ContainerRequest} with the given constraints and @@ -171,6 +173,26 @@ public abstract class AMRMClient extends this(capability, nodes, racks, priority, allocationRequestId, true, null, ExecutionTypeRequest.newInstance()); } + /** + * Instantiates a {@link ContainerRequest} with the given constraints and + * locality relaxation enabled. + * + * @param capability + * The {@link ProfileCapability} to be requested for each container. + * @param nodes + * Any hosts to request that the containers are placed on. + * @param racks + * Any racks to request that the containers are placed on. The + * racks corresponding to any hosts requested will be automatically + * added to this list. + * @param priority + * The priority at which to request the containers. Higher + * priorities have lower numerical values. + */ + public ContainerRequest(ProfileCapability capability, String[] nodes, + String[] racks, Priority priority) { + this(capability, nodes, racks, priority, 0, true, null); + } /** * Instantiates a {@link ContainerRequest} with the given constraints. @@ -199,6 +221,29 @@ public abstract class AMRMClient extends * Instantiates a {@link ContainerRequest} with the given constraints. * * @param capability + * The {@link ProfileCapability} to be requested for each container. + * @param nodes + * Any hosts to request that the containers are placed on. + * @param racks + * Any racks to request that the containers are placed on. The + * racks corresponding to any hosts requested will be automatically + * added to this list. + * @param priority + * The priority at which to request the containers. Higher + * priorities have lower numerical values. + * @param relaxLocality + * If true, containers for this request may be assigned on hosts + * and racks other than the ones explicitly requested. + */ + public ContainerRequest(ProfileCapability capability, String[] nodes, + String[] racks, Priority priority, boolean relaxLocality) { + this(capability, nodes, racks, priority, 0, relaxLocality, null); + } + + /** + * Instantiates a {@link ContainerRequest} with the given constraints. + * + * @param capability * The {@link Resource} to be requested for each container. * @param nodes * Any hosts to request that the containers are placed on. @@ -285,6 +330,59 @@ public abstract class AMRMClient extends relaxLocality, nodeLabelsExpression, ExecutionTypeRequest.newInstance()); } + + public ContainerRequest(ProfileCapability capability, String[] nodes, + String[] racks, Priority priority, long allocationRequestId, + boolean relaxLocality, String nodeLabelsExpression) { + this(capability, nodes, racks, priority, allocationRequestId, + relaxLocality, nodeLabelsExpression, + ExecutionTypeRequest.newInstance()); + } + + /** + * Instantiates a {@link ContainerRequest} with the given constraints. + * + * @param capability + * The {@link Resource} to be requested for each container. + * @param nodes + * Any hosts to request that the containers are placed on. + * @param racks + * Any racks to request that the containers are placed on. The + * racks corresponding to any hosts requested will be automatically + * added to this list. + * @param priority + * The priority at which to request the containers. Higher + * priorities have lower numerical values. + * @param allocationRequestId + * The allocationRequestId of the request. To be used as a tracking + * id to match Containers allocated against this request. Will + * default to 0 if not specified. + * @param relaxLocality + * If true, containers for this request may be assigned on hosts + * and racks other than the ones explicitly requested. + * @param nodeLabelsExpression + * Set node labels to allocate resource, now we only support + * asking for only a single node label + * @param executionTypeRequest + * Set the execution type of the container request. + */ + public ContainerRequest(Resource capability, String[] nodes, String[] racks, + Priority priority, long allocationRequestId, boolean relaxLocality, + String nodeLabelsExpression, + ExecutionTypeRequest executionTypeRequest) { + this(capability, nodes, racks, priority, allocationRequestId, + relaxLocality, nodeLabelsExpression, executionTypeRequest, + ProfileCapability.DEFAULT_PROFILE); + } + + public ContainerRequest(ProfileCapability capability, String[] nodes, + String[] racks, Priority priority, long allocationRequestId, + boolean relaxLocality, String nodeLabelsExpression, + ExecutionTypeRequest executionTypeRequest) { + this(capability.getProfileCapabilityOverride(), nodes, racks, priority, + allocationRequestId, relaxLocality, nodeLabelsExpression, + executionTypeRequest, capability.getProfileName()); + } /** * Instantiates a {@link ContainerRequest} with the given constraints. @@ -312,11 +410,13 @@ public abstract class AMRMClient extends * asking for only a single node label * @param executionTypeRequest * Set the execution type of the container request. + * @param profile + * Set the resource profile for the container request */ public ContainerRequest(Resource capability, String[] nodes, String[] racks, Priority priority, long allocationRequestId, boolean relaxLocality, String nodeLabelsExpression, - ExecutionTypeRequest executionTypeRequest) { + ExecutionTypeRequest executionTypeRequest, String profile) { this.allocationRequestId = allocationRequestId; this.capability = capability; this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null); @@ -325,6 +425,7 @@ public abstract class AMRMClient extends this.relaxLocality = relaxLocality; this.nodeLabelsExpression = nodeLabelsExpression; this.executionTypeRequest = executionTypeRequest; + this.resourceProfile = profile; sanityCheck(); } @@ -376,6 +477,10 @@ public abstract class AMRMClient extends return executionTypeRequest; } + public String getResourceProfile() { + return resourceProfile; + } + public String toString() { StringBuilder sb = new StringBuilder(); sb.append("Capability[").append(capability).append("]"); @@ -383,6 +488,7 @@ public abstract class AMRMClient extends sb.append("AllocationRequestId[").append(allocationRequestId).append("]"); sb.append("ExecutionTypeRequest[").append(executionTypeRequest) .append("]"); + sb.append("Resource Profile[").append(resourceProfile).append("]"); return sb.toString(); } @@ -635,6 +741,15 @@ public abstract class AMRMClient extends " AMRMClient is expected to implement this !!"); } + + @InterfaceStability.Evolving + public List> getMatchingRequests( + Priority priority, String resourceName, ExecutionType executionType, + ProfileCapability capability) { + throw new UnsupportedOperationException("The sub-class extending" + + " AMRMClient is expected to implement this !!"); + } + /** * Get outstanding ContainerRequests matching the given * allocationRequestId. These ContainerRequests should have been added via http://git-wip-us.apache.org/repos/asf/hadoop/blob/02fb8fb4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 7a21bc6..8e66c20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -105,56 +106,56 @@ public class AMRMClientImpl extends AMRMClient { protected final Set blacklistedNodes = new HashSet(); protected final Set blacklistAdditions = new HashSet(); protected final Set blacklistRemovals = new HashSet(); + + protected Map resourceProfilesMap; static class ResourceRequestInfo { ResourceRequest remoteRequest; LinkedHashSet containerRequests; - + ResourceRequestInfo(Long allocationRequestId, Priority priority, - String resourceName, Resource capability, boolean relaxLocality) { + String resourceName, Resource capability, boolean relaxLocality, + String resourceProfile) { + ProfileCapability profileCapability = ProfileCapability + .newInstance(resourceProfile, capability); remoteRequest = ResourceRequest.newBuilder().priority(priority) .resourceName(resourceName).capability(capability).numContainers(0) - .allocationRequestId(allocationRequestId) - .relaxLocality(relaxLocality).build(); + .allocationRequestId(allocationRequestId).relaxLocality(relaxLocality) + .profileCapability(profileCapability).build(); containerRequests = new LinkedHashSet(); } } /** - * Class compares Resource by memory then cpu in reverse order + * Class compares Resource by memory, then cpu and then the remaining resource + * types in reverse order. */ - static class ResourceReverseMemoryThenCpuComparator implements - Comparator, Serializable { - static final long serialVersionUID = 12345L; - @Override - public int compare(Resource arg0, Resource arg1) { - long mem0 = arg0.getMemorySize(); - long mem1 = arg1.getMemorySize(); - long cpu0 = arg0.getVirtualCores(); - long cpu1 = arg1.getVirtualCores(); - if(mem0 == mem1) { - if(cpu0 == cpu1) { - return 0; - } - if(cpu0 < cpu1) { - return 1; - } - return -1; - } - if(mem0 < mem1) { - return 1; - } - return -1; - } + static class ProfileCapabilityComparator + implements Comparator { + + HashMap resourceProfilesMap; + + public ProfileCapabilityComparator( + HashMap resourceProfileMap) { + this.resourceProfilesMap = resourceProfileMap; + } + + public int compare(T arg0, T arg1) { + Resource resource0 = + ProfileCapability.toResource(arg0, resourceProfilesMap); + Resource resource1 = + ProfileCapability.toResource(arg1, resourceProfilesMap); + return resource1.compareTo(resource0); + } } - static boolean canFit(Resource arg0, Resource arg1) { - long mem0 = arg0.getMemorySize(); - long mem1 = arg1.getMemorySize(); - long cpu0 = arg0.getVirtualCores(); - long cpu1 = arg1.getVirtualCores(); - - return (mem0 <= mem1 && cpu0 <= cpu1); + boolean canFit(ProfileCapability arg0, ProfileCapability arg1) { + Resource resource0 = + ProfileCapability.toResource(arg0, resourceProfilesMap); + Resource resource1 = + ProfileCapability.toResource(arg1, resourceProfilesMap); + return Resources.fitsIn(resource0, resource1); + } private final Map> remoteRequests = @@ -233,6 +234,7 @@ public class AMRMClientImpl extends AMRMClient { return registerApplicationMaster(); } + @SuppressWarnings("unchecked") private RegisterApplicationMasterResponse registerApplicationMaster() throws YarnException, IOException { RegisterApplicationMasterRequest request = @@ -245,6 +247,7 @@ public class AMRMClientImpl extends AMRMClient { if (!response.getNMTokensFromPreviousAttempts().isEmpty()) { populateNMTokens(response.getNMTokensFromPreviousAttempts()); } + this.resourceProfilesMap = response.getResourceProfiles(); } return response; } @@ -416,13 +419,15 @@ public class AMRMClientImpl extends AMRMClient { for(ResourceRequest r : ask) { // create a copy of ResourceRequest as we might change it while the // RPC layer is using it to send info across - ResourceRequest rr = ResourceRequest.newBuilder() - .priority(r.getPriority()).resourceName(r.getResourceName()) - .capability(r.getCapability()).numContainers(r.getNumContainers()) - .relaxLocality(r.getRelaxLocality()) - .nodeLabelExpression(r.getNodeLabelExpression()) - .executionTypeRequest(r.getExecutionTypeRequest()) - .allocationRequestId(r.getAllocationRequestId()).build(); + ResourceRequest rr = + ResourceRequest.newBuilder().priority(r.getPriority()) + .resourceName(r.getResourceName()).capability(r.getCapability()) + .numContainers(r.getNumContainers()) + .relaxLocality(r.getRelaxLocality()) + .nodeLabelExpression(r.getNodeLabelExpression()) + .executionTypeRequest(r.getExecutionTypeRequest()) + .allocationRequestId(r.getAllocationRequestId()) + .profileCapability(r.getProfileCapability()).build(); askList.add(rr); } return askList; @@ -504,6 +509,8 @@ public class AMRMClientImpl extends AMRMClient { public synchronized void addContainerRequest(T req) { Preconditions.checkArgument(req != null, "Resource request can not be null."); + ProfileCapability profileCapability = ProfileCapability + .newInstance(req.getResourceProfile(), req.getCapability()); Set dedupedRacks = new HashSet(); if (req.getRacks() != null) { dedupedRacks.addAll(req.getRacks()); @@ -516,6 +523,8 @@ public class AMRMClientImpl extends AMRMClient { Set inferredRacks = resolveRacks(req.getNodes()); inferredRacks.removeAll(dedupedRacks); + checkResourceProfile(req.getResourceProfile()); + // check that specific and non-specific requests cannot be mixed within a // priority checkLocalityRelaxationConflict(req.getAllocationRequestId(), @@ -540,26 +549,26 @@ public class AMRMClientImpl extends AMRMClient { } for (String node : dedupedNodes) { addResourceRequest(req.getPriority(), node, - req.getExecutionTypeRequest(), req.getCapability(), req, true, + req.getExecutionTypeRequest(), profileCapability, req, true, req.getNodeLabelExpression()); } } for (String rack : dedupedRacks) { addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(), - req.getCapability(), req, true, req.getNodeLabelExpression()); + profileCapability, req, true, req.getNodeLabelExpression()); } // Ensure node requests are accompanied by requests for // corresponding rack for (String rack : inferredRacks) { addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(), - req.getCapability(), req, req.getRelaxLocality(), + profileCapability, req, req.getRelaxLocality(), req.getNodeLabelExpression()); } // Off-switch addResourceRequest(req.getPriority(), ResourceRequest.ANY, - req.getExecutionTypeRequest(), req.getCapability(), req, + req.getExecutionTypeRequest(), profileCapability, req, req.getRelaxLocality(), req.getNodeLabelExpression()); } @@ -567,6 +576,8 @@ public class AMRMClientImpl extends AMRMClient { public synchronized void removeContainerRequest(T req) { Preconditions.checkArgument(req != null, "Resource request can not be null."); + ProfileCapability profileCapability = ProfileCapability + .newInstance(req.getResourceProfile(), req.getCapability()); Set allRacks = new HashSet(); if (req.getRacks() != null) { allRacks.addAll(req.getRacks()); @@ -577,17 +588,17 @@ public class AMRMClientImpl extends AMRMClient { if (req.getNodes() != null) { for (String node : new HashSet(req.getNodes())) { decResourceRequest(req.getPriority(), node, - req.getExecutionTypeRequest(), req.getCapability(), req); + req.getExecutionTypeRequest(), profileCapability, req); } } for (String rack : allRacks) { decResourceRequest(req.getPriority(), rack, - req.getExecutionTypeRequest(), req.getCapability(), req); + req.getExecutionTypeRequest(), profileCapability, req); } decResourceRequest(req.getPriority(), ResourceRequest.ANY, - req.getExecutionTypeRequest(), req.getCapability(), req); + req.getExecutionTypeRequest(), profileCapability, req); } @Override @@ -686,6 +697,17 @@ public class AMRMClientImpl extends AMRMClient { public synchronized List> getMatchingRequests( Priority priority, String resourceName, ExecutionType executionType, Resource capability) { + ProfileCapability profileCapability = + ProfileCapability.newInstance(capability); + return getMatchingRequests(priority, resourceName, executionType, + profileCapability); + } + + @Override + @SuppressWarnings("unchecked") + public synchronized List> getMatchingRequests( + Priority priority, String resourceName, ExecutionType executionType, + ProfileCapability capability) { Preconditions.checkArgument(capability != null, "The Resource to be requested should not be null "); Preconditions.checkArgument(priority != null, @@ -695,22 +717,22 @@ public class AMRMClientImpl extends AMRMClient { RemoteRequestsTable remoteRequestsTable = getTable(0); if (null != remoteRequestsTable) { - List> matchingRequests = - remoteRequestsTable.getMatchingRequests(priority, resourceName, - executionType, capability); + List> matchingRequests = remoteRequestsTable + .getMatchingRequests(priority, resourceName, executionType, + capability); if (null != matchingRequests) { // If no exact match. Container may be larger than what was requested. // get all resources <= capability. map is reverse sorted. for (ResourceRequestInfo resReqInfo : matchingRequests) { - if (canFit(resReqInfo.remoteRequest.getCapability(), capability) && - !resReqInfo.containerRequests.isEmpty()) { + if (canFit(resReqInfo.remoteRequest.getProfileCapability(), + capability) && !resReqInfo.containerRequests.isEmpty()) { list.add(resReqInfo.containerRequests); } } } } // no match found - return list; + return list; } private Set resolveRacks(List nodes) { @@ -758,6 +780,15 @@ public class AMRMClientImpl extends AMRMClient { } } } + + private void checkResourceProfile(String profile) { + if (resourceProfilesMap != null && !resourceProfilesMap.isEmpty() + && !resourceProfilesMap.containsKey(profile)) { + throw new InvalidContainerRequestException( + "Invalid profile name, valid profile names are " + resourceProfilesMap + .keySet()); + } + } /** * Valid if a node label expression specified on container request is valid or @@ -845,12 +876,16 @@ public class AMRMClientImpl extends AMRMClient { } private void addResourceRequest(Priority priority, String resourceName, - ExecutionTypeRequest execTypeReq, Resource capability, T req, + ExecutionTypeRequest execTypeReq, ProfileCapability capability, T req, boolean relaxLocality, String labelExpression) { RemoteRequestsTable remoteRequestsTable = getTable(req.getAllocationRequestId()); if (remoteRequestsTable == null) { remoteRequestsTable = new RemoteRequestsTable(); + if (this.resourceProfilesMap instanceof HashMap) { + remoteRequestsTable.setResourceComparator( + new ProfileCapabilityComparator((HashMap) resourceProfilesMap)); + } putTable(req.getAllocationRequestId(), remoteRequestsTable); } @SuppressWarnings("unchecked") @@ -863,6 +898,7 @@ public class AMRMClientImpl extends AMRMClient { addResourceRequestToAsk(resourceRequestInfo.remoteRequest); if (LOG.isDebugEnabled()) { + LOG.debug("Adding request to ask " + resourceRequestInfo.remoteRequest); LOG.debug("addResourceRequest:" + " applicationId=" + " priority=" + priority.getPriority() + " resourceName=" + resourceName + " numContainers=" @@ -872,7 +908,7 @@ public class AMRMClientImpl extends AMRMClient { } private void decResourceRequest(Priority priority, String resourceName, - ExecutionTypeRequest execTypeReq, Resource capability, T req) { + ExecutionTypeRequest execTypeReq, ProfileCapability capability, T req) { RemoteRequestsTable remoteRequestsTable = getTable(req.getAllocationRequestId()); if (remoteRequestsTable != null) { @@ -882,7 +918,7 @@ public class AMRMClientImpl extends AMRMClient { execTypeReq, capability, req); // send the ResourceRequest to RM even if is 0 because it needs to // override a previously sent value. If ResourceRequest was not sent - // previously then sending 0 ought to be a no-op on RM + // previously then sending 0 aught to be a no-op on RM if (resourceRequestInfo != null) { addResourceRequestToAsk(resourceRequestInfo.remoteRequest); http://git-wip-us.apache.org/repos/asf/hadoop/blob/02fb8fb4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java index 110ca79..135e1db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java @@ -23,7 +23,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ProfileCapability; import java.util.Collection; import java.util.HashMap; @@ -35,43 +35,42 @@ import java.util.TreeMap; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ResourceRequestInfo; -import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ResourceReverseMemoryThenCpuComparator; +import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ProfileCapabilityComparator; class RemoteRequestsTable implements Iterable{ private static final Log LOG = LogFactory.getLog(RemoteRequestsTable.class); - static ResourceReverseMemoryThenCpuComparator resourceComparator = - new ResourceReverseMemoryThenCpuComparator(); + private ProfileCapabilityComparator resourceComparator; /** * Nested Iterator that iterates over just the ResourceRequestInfo * object. */ class RequestInfoIterator implements Iterator { - private Iterator>>> iLocMap; - private Iterator>> iExecTypeMap; - private Iterator> iCapMap; + private Iterator> iCapMap; private Iterator iResReqInfo; public RequestInfoIterator(Iterator>>> + Map>>> iLocationMap) { this.iLocMap = iLocationMap; if (iLocMap.hasNext()) { iExecTypeMap = iLocMap.next().values().iterator(); } else { iExecTypeMap = - new LinkedList>>().iterator(); } if (iExecTypeMap.hasNext()) { iCapMap = iExecTypeMap.next().values().iterator(); } else { iCapMap = - new LinkedList>() + new LinkedList>() .iterator(); } if (iCapMap.hasNext()) { @@ -113,7 +112,7 @@ class RemoteRequestsTable implements Iterable{ // Nest map with Primary key : // Priority -> ResourceName(String) -> ExecutionType -> Capability(Resource) // and value : ResourceRequestInfo - private Map>>> remoteRequestsTable = new HashMap<>(); @Override @@ -122,8 +121,8 @@ class RemoteRequestsTable implements Iterable{ } ResourceRequestInfo get(Priority priority, String location, - ExecutionType execType, Resource capability) { - TreeMap capabilityMap = + ExecutionType execType, ProfileCapability capability) { + TreeMap capabilityMap = getCapabilityMap(priority, location, execType); if (capabilityMap == null) { return null; @@ -131,9 +130,10 @@ class RemoteRequestsTable implements Iterable{ return capabilityMap.get(capability); } + @SuppressWarnings("unchecked") void put(Priority priority, String resourceName, ExecutionType execType, - Resource capability, ResourceRequestInfo resReqInfo) { - Map>> locationMap = remoteRequestsTable.get(priority); if (locationMap == null) { @@ -143,8 +143,8 @@ class RemoteRequestsTable implements Iterable{ LOG.debug("Added priority=" + priority); } } - Map> execTypeMap = - locationMap.get(resourceName); + Map> + execTypeMap = locationMap.get(resourceName); if (execTypeMap == null) { execTypeMap = new HashMap<>(); locationMap.put(resourceName, execTypeMap); @@ -152,9 +152,14 @@ class RemoteRequestsTable implements Iterable{ LOG.debug("Added resourceName=" + resourceName); } } - TreeMap capabilityMap = + TreeMap capabilityMap = execTypeMap.get(execType); if (capabilityMap == null) { + // this can happen if the user doesn't register with the RM before + // calling addResourceRequest + if (resourceComparator == null) { + resourceComparator = new ProfileCapabilityComparator(new HashMap<>()); + } capabilityMap = new TreeMap<>(resourceComparator); execTypeMap.put(execType, capabilityMap); if (LOG.isDebugEnabled()) { @@ -165,9 +170,9 @@ class RemoteRequestsTable implements Iterable{ } ResourceRequestInfo remove(Priority priority, String resourceName, - ExecutionType execType, Resource capability) { + ExecutionType execType, ProfileCapability capability) { ResourceRequestInfo retVal = null; - Map>> locationMap = remoteRequestsTable.get(priority); if (locationMap == null) { if (LOG.isDebugEnabled()) { @@ -175,7 +180,7 @@ class RemoteRequestsTable implements Iterable{ } return null; } - Map> + Map> execTypeMap = locationMap.get(resourceName); if (execTypeMap == null) { if (LOG.isDebugEnabled()) { @@ -183,7 +188,7 @@ class RemoteRequestsTable implements Iterable{ } return null; } - TreeMap capabilityMap = + TreeMap capabilityMap = execTypeMap.get(execType); if (capabilityMap == null) { if (LOG.isDebugEnabled()) { @@ -204,14 +209,14 @@ class RemoteRequestsTable implements Iterable{ return retVal; } - Map>> getLocationMap(Priority priority) { return remoteRequestsTable.get(priority); } - Map> + Map> getExecutionTypeMap(Priority priority, String location) { - Map>> locationMap = getLocationMap(priority); if (locationMap == null) { return null; @@ -219,10 +224,10 @@ class RemoteRequestsTable implements Iterable{ return locationMap.get(location); } - TreeMap getCapabilityMap(Priority + TreeMap getCapabilityMap(Priority priority, String location, ExecutionType execType) { - Map> + Map> executionTypeMap = getExecutionTypeMap(priority, location); if (executionTypeMap == null) { return null; @@ -236,7 +241,7 @@ class RemoteRequestsTable implements Iterable{ List retList = new LinkedList<>(); for (String location : locations) { for (ExecutionType eType : ExecutionType.values()) { - TreeMap capabilityMap = + TreeMap capabilityMap = getCapabilityMap(priority, location, eType); if (capabilityMap != null) { retList.addAll(capabilityMap.values()); @@ -248,9 +253,9 @@ class RemoteRequestsTable implements Iterable{ List getMatchingRequests( Priority priority, String resourceName, ExecutionType executionType, - Resource capability) { + ProfileCapability capability) { List list = new LinkedList<>(); - TreeMap capabilityMap = + TreeMap capabilityMap = getCapabilityMap(priority, resourceName, executionType); if (capabilityMap != null) { ResourceRequestInfo resourceRequestInfo = capabilityMap.get(capability); @@ -266,14 +271,15 @@ class RemoteRequestsTable implements Iterable{ @SuppressWarnings("unchecked") ResourceRequestInfo addResourceRequest(Long allocationRequestId, Priority priority, String resourceName, ExecutionTypeRequest execTypeReq, - Resource capability, T req, boolean relaxLocality, + ProfileCapability capability, T req, boolean relaxLocality, String labelExpression) { - ResourceRequestInfo resourceRequestInfo = get(priority, resourceName, - execTypeReq.getExecutionType(), capability); + ResourceRequestInfo resourceRequestInfo = + get(priority, resourceName, execTypeReq.getExecutionType(), capability); if (resourceRequestInfo == null) { resourceRequestInfo = new ResourceRequestInfo(allocationRequestId, priority, resourceName, - capability, relaxLocality); + capability.getProfileCapabilityOverride(), relaxLocality, + capability.getProfileName()); put(priority, resourceName, execTypeReq.getExecutionType(), capability, resourceRequestInfo); } @@ -288,11 +294,14 @@ class RemoteRequestsTable implements Iterable{ if (ResourceRequest.ANY.equals(resourceName)) { resourceRequestInfo.remoteRequest.setNodeLabelExpression(labelExpression); } + if (LOG.isDebugEnabled()) { + LOG.debug("Adding request to ask " + resourceRequestInfo.remoteRequest); + } return resourceRequestInfo; } ResourceRequestInfo decResourceRequest(Priority priority, String resourceName, - ExecutionTypeRequest execTypeReq, Resource capability, T req) { + ExecutionTypeRequest execTypeReq, ProfileCapability capability, T req) { ResourceRequestInfo resourceRequestInfo = get(priority, resourceName, execTypeReq.getExecutionType(), capability); @@ -330,4 +339,34 @@ class RemoteRequestsTable implements Iterable{ return remoteRequestsTable.isEmpty(); } + @SuppressWarnings("unchecked") + public void setResourceComparator(ProfileCapabilityComparator comparator) { + ProfileCapabilityComparator old = this.resourceComparator; + this.resourceComparator = comparator; + if (old != null) { + // we've already set a resource comparator - re-create the maps with the + // new one. this is needed in case someone adds container requests before + // registering with the RM. In such a case, the comparator won't have + // the resource profiles map. After registration, the map is available + // so re-create the capabilities maps + + for (Map.Entry>>> + priEntry : remoteRequestsTable.entrySet()) { + for (Map.Entry>> nameEntry : priEntry.getValue().entrySet()) { + for (Map.Entry> execEntry : nameEntry + .getValue().entrySet()) { + Map capabilityMap = + execEntry.getValue(); + TreeMap newCapabiltyMap = + new TreeMap<>(resourceComparator); + newCapabiltyMap.putAll(capabilityMap); + execEntry.setValue(newCapabiltyMap); + } + } + } + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/02fb8fb4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 09b12f2..662271a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -130,11 +130,13 @@ public class TestAMRMClient { @Before public void setup() throws Exception { conf = new YarnConfiguration(); - createClusterAndStartApplication(); + createClusterAndStartApplication(conf); } - private void createClusterAndStartApplication() throws Exception { + private void createClusterAndStartApplication(Configuration conf) + throws Exception { // start minicluster + this.conf = conf; conf.set(YarnConfiguration.RM_SCHEDULER, schedulerName); conf.setLong( YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, @@ -536,7 +538,8 @@ public class TestAMRMClient { } @Test (timeout=60000) - public void testAMRMClientMatchingFitInferredRack() throws YarnException, IOException { + public void testAMRMClientMatchingFitInferredRack() + throws YarnException, IOException { AMRMClientImpl amClient = null; try { // start am rm client @@ -544,10 +547,10 @@ public class TestAMRMClient { amClient.init(conf); amClient.start(); amClient.registerApplicationMaster("Host", 10000, ""); - + Resource capability = Resource.newInstance(1024, 2); - ContainerRequest storedContainer1 = + ContainerRequest storedContainer1 = new ContainerRequest(capability, nodes, null, priority); amClient.addContainerRequest(storedContainer1); @@ -564,14 +567,15 @@ public class TestAMRMClient { verifyMatches(matches, 1); storedRequest = matches.get(0).iterator().next(); assertEquals(storedContainer1, storedRequest); - + // inferred rack match no longer valid after request is removed amClient.removeContainerRequest(storedContainer1); matches = amClient.getMatchingRequests(priority, rack, capability); assertTrue(matches.isEmpty()); - - amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, - null, null); + + amClient + .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, + null); } finally { if (amClient != null && amClient.getServiceState() == STATE.STARTED) { @@ -604,16 +608,19 @@ public class TestAMRMClient { amClient.addContainerRequest(storedContainer1); amClient.addContainerRequest(storedContainer2); amClient.addContainerRequest(storedContainer3); + + ProfileCapability profileCapability = + ProfileCapability.newInstance(capability); // test addition and storage RemoteRequestsTable remoteRequestsTable = amClient.getTable(0); int containersRequestedAny = remoteRequestsTable.get(priority, - ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) + ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability) .remoteRequest.getNumContainers(); assertEquals(2, containersRequestedAny); containersRequestedAny = remoteRequestsTable.get(priority1, - ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) + ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability) .remoteRequest.getNumContainers(); assertEquals(1, containersRequestedAny); List> matches = @@ -884,7 +891,7 @@ public class TestAMRMClient { teardown(); conf = new YarnConfiguration(); conf.set(CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION, "privacy"); - createClusterAndStartApplication(); + createClusterAndStartApplication(conf); initAMRMClientAndTest(false); } @@ -1701,14 +1708,16 @@ public class TestAMRMClient { int expAsks, int expRelease) { RemoteRequestsTable remoteRequestsTable = amClient.getTable(allocationReqId); + ProfileCapability profileCapability = + ProfileCapability.newInstance(capability); int containersRequestedNode = remoteRequestsTable.get(priority, - node, ExecutionType.GUARANTEED, capability).remoteRequest + node, ExecutionType.GUARANTEED, profileCapability).remoteRequest .getNumContainers(); int containersRequestedRack = remoteRequestsTable.get(priority, - rack, ExecutionType.GUARANTEED, capability).remoteRequest + rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest .getNumContainers(); int containersRequestedAny = remoteRequestsTable.get(priority, - ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) + ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability) .remoteRequest.getNumContainers(); assertEquals(expNode, containersRequestedNode); @@ -1906,4 +1915,106 @@ public class TestAMRMClient { } return result; } + + @Test(timeout = 60000) + public void testGetMatchingFitWithProfiles() throws Exception { + teardown(); + conf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, true); + createClusterAndStartApplication(conf); + AMRMClient amClient = null; + try { + // start am rm client + amClient = AMRMClient.createAMRMClient(); + amClient.init(conf); + amClient.start(); + amClient.registerApplicationMaster("Host", 10000, ""); + + ProfileCapability capability1 = ProfileCapability.newInstance("minimum"); + ProfileCapability capability2 = ProfileCapability.newInstance("default"); + ProfileCapability capability3 = ProfileCapability.newInstance("maximum"); + ProfileCapability capability4 = ProfileCapability + .newInstance("minimum", Resource.newInstance(2048, 1)); + ProfileCapability capability5 = ProfileCapability.newInstance("default"); + ProfileCapability capability6 = ProfileCapability + .newInstance("default", Resource.newInstance(2048, 1)); + // http has the same capabilities as default + ProfileCapability capability7 = ProfileCapability.newInstance("http"); + + ContainerRequest storedContainer1 = + new ContainerRequest(capability1, nodes, racks, priority); + ContainerRequest storedContainer2 = + new ContainerRequest(capability2, nodes, racks, priority); + ContainerRequest storedContainer3 = + new ContainerRequest(capability3, nodes, racks, priority); + ContainerRequest storedContainer4 = + new ContainerRequest(capability4, nodes, racks, priority); + ContainerRequest storedContainer5 = + new ContainerRequest(capability5, nodes, racks, priority2); + ContainerRequest storedContainer6 = + new ContainerRequest(capability6, nodes, racks, priority); + ContainerRequest storedContainer7 = + new ContainerRequest(capability7, nodes, racks, priority); + + + amClient.addContainerRequest(storedContainer1); + amClient.addContainerRequest(storedContainer2); + amClient.addContainerRequest(storedContainer3); + amClient.addContainerRequest(storedContainer4); + amClient.addContainerRequest(storedContainer5); + amClient.addContainerRequest(storedContainer6); + amClient.addContainerRequest(storedContainer7); + + // test matching of containers + List> matches; + ContainerRequest storedRequest; + // exact match + ProfileCapability testCapability1 = + ProfileCapability.newInstance("minimum"); + matches = amClient + .getMatchingRequests(priority, node, ExecutionType.GUARANTEED, + testCapability1); + verifyMatches(matches, 1); + storedRequest = matches.get(0).iterator().next(); + assertEquals(storedContainer1, storedRequest); + amClient.removeContainerRequest(storedContainer1); + + // exact matching with order maintained + // we should get back 3 matches - default + http because they have the + // same capability + ProfileCapability testCapability2 = + ProfileCapability.newInstance("default"); + matches = amClient + .getMatchingRequests(priority, node, ExecutionType.GUARANTEED, + testCapability2); + verifyMatches(matches, 2); + // must be returned in the order they were made + int i = 0; + for (ContainerRequest storedRequest1 : matches.get(0)) { + switch(i) { + case 0: + assertEquals(storedContainer2, storedRequest1); + break; + case 1: + assertEquals(storedContainer7, storedRequest1); + break; + } + i++; + } + amClient.removeContainerRequest(storedContainer5); + + // matching with larger container. all requests returned + Resource testCapability3 = Resource.newInstance(8192, 8); + matches = amClient + .getMatchingRequests(priority, node, testCapability3); + assertEquals(3, matches.size()); + + Resource testCapability4 = Resource.newInstance(2048, 1); + matches = amClient.getMatchingRequests(priority, node, testCapability4); + assertEquals(1, matches.size()); + } finally { + if (amClient != null && amClient.getServiceState() == STATE.STARTED) { + amClient.stop(); + } + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/02fb8fb4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java index ad18da3..53e70ec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java @@ -29,6 +29,7 @@ import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.client.api.AMRMClient; @@ -274,9 +275,10 @@ public class TestAMRMClientContainerRequest { AMRMClientImpl client, ContainerRequest request, String location, boolean expectedRelaxLocality, ExecutionType executionType) { - ResourceRequest ask = client.getTable(0) - .get(request.getPriority(), location, executionType, - request.getCapability()).remoteRequest; + ProfileCapability profileCapability = ProfileCapability + .newInstance(request.getResourceProfile(), request.getCapability()); + ResourceRequest ask = client.getTable(0).get(request.getPriority(), + location, executionType, profileCapability).remoteRequest; assertEquals(location, ask.getResourceName()); assertEquals(1, ask.getNumContainers()); assertEquals(expectedRelaxLocality, ask.getRelaxLocality()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/02fb8fb4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java index e180f6d..00f5e03 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; @@ -387,18 +388,21 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest { RemoteRequestsTable remoteRequestsTable = amClient.getTable(0); + ProfileCapability profileCapability = + ProfileCapability.newInstance(capability); + int containersRequestedNode = remoteRequestsTable.get(priority, - node, ExecutionType.GUARANTEED, capability).remoteRequest + node, ExecutionType.GUARANTEED, profileCapability).remoteRequest .getNumContainers(); int containersRequestedRack = remoteRequestsTable.get(priority, - rack, ExecutionType.GUARANTEED, capability).remoteRequest + rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest .getNumContainers(); int containersRequestedAny = remoteRequestsTable.get(priority, - ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) + ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability) .remoteRequest.getNumContainers(); int oppContainersRequestedAny = remoteRequestsTable.get(priority2, ResourceRequest.ANY, - ExecutionType.OPPORTUNISTIC, capability).remoteRequest + ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest .getNumContainers(); assertEquals(2, containersRequestedNode); http://git-wip-us.apache.org/repos/asf/hadoop/blob/02fb8fb4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java index 9b79e2d..ddabd17 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -255,9 +256,11 @@ public class TestNMClient { racks, priority)); } + ProfileCapability profileCapability = + ProfileCapability.newInstance(capability); int containersRequestedAny = rmClient.getTable(0) .get(priority, ResourceRequest.ANY, ExecutionType.GUARANTEED, - capability).remoteRequest.getNumContainers(); + profileCapability).remoteRequest.getNumContainers(); // RM should allocate container within 2 calls to allocate() int allocatedContainerCount = 0; http://git-wip-us.apache.org/repos/asf/hadoop/blob/02fb8fb4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java index 305d18b..12c32fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; @@ -99,6 +100,7 @@ public class TestOpportunisticContainerAllocation { private static final long AM_EXPIRE_MS = 4000; private static Resource capability; + private static ProfileCapability profileCapability; private static Priority priority; private static Priority priority2; private static Priority priority3; @@ -151,6 +153,7 @@ public class TestOpportunisticContainerAllocation { priority3 = Priority.newInstance(3); priority4 = Priority.newInstance(4); capability = Resource.newInstance(512, 1); + profileCapability = ProfileCapability.newInstance(capability); node = nodeReports.get(0).getNodeId().getHost(); rack = nodeReports.get(0).getRackName(); @@ -273,7 +276,7 @@ public class TestOpportunisticContainerAllocation { int oppContainersRequestedAny = amClient.getTable(0).get(priority2, ResourceRequest.ANY, - ExecutionType.OPPORTUNISTIC, capability).remoteRequest + ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest .getNumContainers(); assertEquals(1, oppContainersRequestedAny); @@ -394,7 +397,7 @@ public class TestOpportunisticContainerAllocation { new AMRMClient.ContainerRequest(capability, null, null, priority3)); int guarContainersRequestedAny = amClient.getTable(0).get(priority3, - ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) + ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability) .remoteRequest.getNumContainers(); assertEquals(1, guarContainersRequestedAny); @@ -512,6 +515,7 @@ public class TestOpportunisticContainerAllocation { assertEquals(0, amClient.ask.size()); assertEquals(0, amClient.release.size()); + amClient.addContainerRequest( new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); amClient.addContainerRequest( @@ -532,17 +536,17 @@ public class TestOpportunisticContainerAllocation { ExecutionType.OPPORTUNISTIC, true))); int containersRequestedNode = amClient.getTable(0).get(priority, - node, ExecutionType.GUARANTEED, capability).remoteRequest + node, ExecutionType.GUARANTEED, profileCapability).remoteRequest .getNumContainers(); int containersRequestedRack = amClient.getTable(0).get(priority, - rack, ExecutionType.GUARANTEED, capability).remoteRequest + rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest .getNumContainers(); int containersRequestedAny = amClient.getTable(0).get(priority, - ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) + ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability) .remoteRequest.getNumContainers(); int oppContainersRequestedAny = amClient.getTable(0).get(priority2, ResourceRequest.ANY, - ExecutionType.OPPORTUNISTIC, capability).remoteRequest + ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest .getNumContainers(); assertEquals(4, containersRequestedNode); @@ -564,17 +568,17 @@ public class TestOpportunisticContainerAllocation { ExecutionType.OPPORTUNISTIC, true))); containersRequestedNode = amClient.getTable(0).get(priority, - node, ExecutionType.GUARANTEED, capability).remoteRequest + node, ExecutionType.GUARANTEED, profileCapability).remoteRequest .getNumContainers(); containersRequestedRack = amClient.getTable(0).get(priority, - rack, ExecutionType.GUARANTEED, capability).remoteRequest + rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest .getNumContainers(); containersRequestedAny = amClient.getTable(0).get(priority, - ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) + ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability) .remoteRequest.getNumContainers(); oppContainersRequestedAny = amClient.getTable(0).get(priority2, ResourceRequest.ANY, - ExecutionType.OPPORTUNISTIC, capability).remoteRequest + ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest .getNumContainers(); assertEquals(2, containersRequestedNode); @@ -691,10 +695,9 @@ public class TestOpportunisticContainerAllocation { ExecutionTypeRequest.newInstance( ExecutionType.OPPORTUNISTIC, true))); - int oppContainersRequestedAny = - amClient.getTable(0).get(priority3, ResourceRequest.ANY, - ExecutionType.OPPORTUNISTIC, capability).remoteRequest - .getNumContainers(); + int oppContainersRequestedAny = amClient.getTable(0) + .get(priority3, ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC, + profileCapability).remoteRequest.getNumContainers(); assertEquals(2, oppContainersRequestedAny); http://git-wip-us.apache.org/repos/asf/hadoop/blob/02fb8fb4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/resource-profiles.json ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/resource-profiles.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/resource-profiles.json new file mode 100644 index 0000000..d0f3f72 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/resource-profiles.json @@ -0,0 +1,18 @@ +{ + "minimum": { + "memory-mb" : 1024, + "vcores" : 1 + }, + "default" : { + "memory-mb" : 2048, + "vcores" : 2 + }, + "maximum" : { + "memory-mb": 4096, + "vcores" : 4 + }, + "http" : { + "memory-mb" : 2048, + "vcores" : 2 + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org