hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asur...@apache.org
Subject [2/2] hadoop git commit: YARN-5587. Add support for resource profiles. (vvasudev via asuresh)
Date Tue, 15 Nov 2016 09:01:27 GMT
YARN-5587. Add support for resource profiles. (vvasudev via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/aa982d22
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/aa982d22
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/aa982d22

Branch: refs/heads/YARN-3926
Commit: aa982d220c2a58f6c7bf21c4d5a9bc514977f347
Parents: ea53187
Author: Arun Suresh <asuresh@apache.org>
Authored: Tue Nov 15 01:01:07 2016 -0800
Committer: Arun Suresh <asuresh@apache.org>
Committed: Tue Nov 15 01:01:07 2016 -0800

----------------------------------------------------------------------
 .../dev-support/findbugs-exclude.xml            |   4 +
 .../RegisterApplicationMasterResponse.java      |   8 +
 .../yarn/api/records/ProfileCapability.java     |  94 ++++++++++-
 .../hadoop/yarn/api/records/Resource.java       |  14 ++
 .../yarn/api/records/ResourceInformation.java   |  57 ++++++-
 .../yarn/api/records/ResourceRequest.java       |  25 ++-
 .../hadoop-yarn/hadoop-yarn-client/pom.xml      |   1 +
 .../hadoop/yarn/client/api/AMRMClient.java      |  77 ++++++++-
 .../yarn/client/api/impl/AMRMClientImpl.java    | 133 +++++++++------
 .../client/api/impl/RemoteRequestsTable.java    | 108 +++++++++----
 .../yarn/client/api/impl/TestAMRMClient.java    | 152 +++++++++++++++--
 .../impl/TestAMRMClientContainerRequest.java    |   5 +-
 .../api/impl/TestDistributedScheduling.java     |  12 +-
 .../yarn/client/api/impl/TestNMClient.java      |   5 +-
 .../src/test/resources/resource-profiles.json   |  18 +++
 ...RegisterApplicationMasterResponsePBImpl.java |  58 +++++++
 .../api/records/impl/pb/ResourcePBImpl.java     |   4 +-
 .../records/impl/pb/ResourceRequestPBImpl.java  |  41 ++++-
 .../yarn/util/resource/ResourceUtils.java       | 161 ++++++++++++++++++-
 .../hadoop/yarn/util/resource/Resources.java    |  10 +-
 .../ApplicationMasterService.java               |  12 ++
 .../server/resourcemanager/RMServerUtils.java   |  51 ++++++
 .../resource/ResourceProfilesManagerImpl.java   |   4 +
 .../scheduler/AbstractYarnScheduler.java        |  44 +++++
 .../scheduler/ClusterNodeTracker.java           |   3 +-
 .../scheduler/SchedulerUtils.java               |  10 ++
 .../scheduler/capacity/CapacityScheduler.java   |   4 +-
 .../scheduler/fair/FairScheduler.java           |   4 +-
 .../scheduler/fifo/FifoScheduler.java           |  13 +-
 .../yarn/server/resourcemanager/MockRM.java     |   2 +
 .../server/resourcemanager/TestAppManager.java  |   1 +
 .../TestApplicationMasterService.java           |  35 ++++
 .../scheduler/fair/TestFairScheduler.java       |   4 +
 .../hadoop/yarn/server/MiniYARNCluster.java     |   2 +
 34 files changed, 1025 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa982d22/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 7c19c5e..f138079 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -151,6 +151,10 @@
     <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
   </Match>
   <Match>
+    <Class name="org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl$ProfileCapabilityComparator" />
+    <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
+  </Match>
+  <Match>
     <Class name="org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl" />
     <Field name="builder" />
     <Bug pattern="SE_BAD_FIELD" />

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa982d22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
index 1a51ba6..706a75b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
@@ -202,4 +202,12 @@ public abstract class RegisterApplicationMasterResponse {
   @Unstable
   public abstract void setSchedulerResourceTypes(
       EnumSet<SchedulerResourceTypes> types);
+
+  @Public
+  @Unstable
+  public abstract Map<String, Resource> getResourceProfiles();
+
+  @Private
+  @Unstable
+  public abstract void setResourceProfiles(Map<String, Resource> profiles);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa982d22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java
index 0a93b89..faaddd5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java
@@ -18,41 +18,93 @@
 
 package org.apache.hadoop.yarn.api.records;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.yarn.util.Records;
 
+import java.util.Map;
+
 /**
  * Class to capture capability requirements when using resource profiles. The
  * ProfileCapability is meant to be used as part of the ResourceRequest. A
  * profile capability has two pieces - the resource profile name and the
  * overrides. The resource profile specifies the name of the resource profile
  * to be used and the capability override is the overrides desired on specific
- * resource types. For example, you could use the "minimum" profile and set the
- * memory in the capability override to 4096M. This implies that you wish for
- * the resources specified in the "minimum" profile but with 4096M memory. The
- * conversion from the ProfileCapability to the Resource class with the actual
- * resource requirements will be done by the ResourceManager, which has the
- * actual profile to Resource mapping.
+ * resource types.
+ *
+ * For example, if you have a resource profile "small" that maps to
+ * {@literal <4096M, 2 cores, 1 gpu>} and you set the capability override to
+ * {@literal <8192M, 0 cores, 0 gpu>}, then the actual resource allocation on
+ * the ResourceManager will be {@literal <8192M, 2 cores, 1 gpu>}.
+ *
+ * Note that the conversion from the ProfileCapability to the Resource class
+ * with the actual resource requirements will be done by the ResourceManager,
+ * which has the actual profile to Resource mapping.
+ *
  */
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
 public abstract class ProfileCapability {
 
+  public static final String DEFAULT_PROFILE = "default";
+
+  public static ProfileCapability newInstance(Resource override) {
+    return newInstance(DEFAULT_PROFILE, override);
+  }
+
+  public static ProfileCapability newInstance(String profile) {
+    Preconditions
+        .checkArgument(profile != null, "The profile name cannot be null");
+    ProfileCapability obj = Records.newRecord(ProfileCapability.class);
+    obj.setProfileName(profile);
+    obj.setProfileCapabilityOverride(Resource.newInstance(0, 0));
+    return obj;
+  }
+
   public static ProfileCapability newInstance(String profile,
       Resource override) {
+    Preconditions
+        .checkArgument(profile != null, "The profile name cannot be null");
     ProfileCapability obj = Records.newRecord(ProfileCapability.class);
     obj.setProfileName(profile);
     obj.setProfileCapabilityOverride(override);
     return obj;
   }
 
+  /**
+   * Get the profile name.
+   * @return the profile name
+   */
   public abstract String getProfileName();
 
+  /**
+   * Get the profile capability override.
+   * @return Resource object containing the override.
+   */
   public abstract Resource getProfileCapabilityOverride();
 
+  /**
+   * Set the resource profile name.
+   * @param profileName the resource profile name
+   */
   public abstract void setProfileName(String profileName);
 
+  /**
+   * Set the capability override to override specific resource types on the
+   * resource profile.
+   *
+   * For example, if you have a resource profile "small" that maps to
+   * {@literal <4096M, 2 cores, 1 gpu>} and you set the capability override to
+   * {@literal <8192M, 0 cores, 0 gpu>}, then the actual resource allocation on
+   * the ResourceManager will be {@literal <8192M, 2 cores, 1 gpu>}.
+   *
+   * Note that the conversion from the ProfileCapability to the Resource class
+   * with the actual resource requirements will be done by the ResourceManager,
+   * which has the actual profile to Resource mapping.
+   *
+   * @param r Resource object containing the capability override
+   */
   public abstract void setProfileCapabilityOverride(Resource r);
 
   @Override
@@ -85,4 +137,34 @@ public abstract class ProfileCapability {
     return "{ profile: " + this.getProfileName() + ", capabilityOverride: "
         + this.getProfileCapabilityOverride() + " }";
   }
+
+  /**
+   * Get a representation of the capability as a Resource object.
+   * @param capability the capability we wish to convert
+   * @param resourceProfilesMap map of profile name to Resource object
+   * @return Resource object representing the capability
+   */
+  public static Resource toResource(ProfileCapability capability,
+      Map<String, Resource> resourceProfilesMap) {
+    Preconditions
+        .checkArgument(capability != null, "Capability cannot be null");
+    Preconditions.checkArgument(resourceProfilesMap != null,
+        "Resource profiles map cannot be null");
+    Resource resource = Resource.newInstance(0, 0);
+
+    if (resourceProfilesMap.containsKey(capability.getProfileName())) {
+      resource = Resource
+          .newInstance(resourceProfilesMap.get(capability.getProfileName()));
+    }
+
+    if(capability.getProfileCapabilityOverride()!= null) {
+      for (Map.Entry<String, ResourceInformation> entry : capability
+          .getProfileCapabilityOverride().getResources().entrySet()) {
+        if (entry.getValue() != null && entry.getValue().getValue() != 0) {
+          resource.setResourceInformation(entry.getKey(), entry.getValue());
+        }
+      }
+    }
+    return resource;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa982d22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
index 4bbca5a..103bcfd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
@@ -19,7 +19,9 @@
 package org.apache.hadoop.yarn.api.records;
 
 import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
@@ -74,6 +76,18 @@ public abstract class Resource implements Comparable<Resource> {
     return resource;
   }
 
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public static Resource newInstance(Resource resource) {
+    Resource ret = Resource.newInstance(0, 0);
+    for (Map.Entry<String, ResourceInformation> entry : resource.getResources()
+        .entrySet()) {
+      ret.setResourceInformation(entry.getKey(),
+          ResourceInformation.newInstance(entry.getValue()));
+    }
+    return ret;
+  }
+
   /**
    * This method is DEPRECATED:
    * Use {@link Resource#getMemorySize()} instead

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa982d22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java
index a17e81b..7d74efc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java
@@ -31,6 +31,8 @@ public class ResourceInformation implements Comparable<ResourceInformation> {
   private String units;
   private ResourceTypes resourceType;
   private Long value;
+  private Long minimumAllocation;
+  private Long maximumAllocation;
 
   private static final String MEMORY_URI = "memory-mb";
   private static final String VCORES_URI = "vcores";
@@ -118,6 +120,42 @@ public class ResourceInformation implements Comparable<ResourceInformation> {
   }
 
   /**
+   * Get the minimum allocation for the resource.
+   *
+   * @return the minimum allocation for the resource
+   */
+  public Long getMinimumAllocation() {
+    return minimumAllocation;
+  }
+
+  /**
+   * Set the minimum allocation for the resource.
+   *
+   * @param minimumAllocation the minimum allocation for the resource
+   */
+  public void setMinimumAllocation(Long minimumAllocation) {
+    this.minimumAllocation = minimumAllocation;
+  }
+
+  /**
+   * Get the maximum allocation for the resource.
+   *
+   * @return the maximum allocation for the resource
+   */
+  public Long getMaximumAllocation() {
+    return maximumAllocation;
+  }
+
+  /**
+   * Set the maximum allocation for the resource.
+   *
+   * @param maximumAllocation the maximum allocation for the resource
+   */
+  public void setMaximumAllocation(Long maximumAllocation) {
+    this.maximumAllocation = maximumAllocation;
+  }
+
+  /**
    * Create a new instance of ResourceInformation from another object.
    *
    * @param other the object from which the new object should be created
@@ -129,33 +167,41 @@ public class ResourceInformation implements Comparable<ResourceInformation> {
     ret.setResourceType(other.getResourceType());
     ret.setUnits(other.getUnits());
     ret.setValue(other.getValue());
+    ret.setMinimumAllocation(other.getMinimumAllocation());
+    ret.setMaximumAllocation(other.getMaximumAllocation());
     return ret;
   }
 
   public static ResourceInformation newInstance(String name, String units,
-      Long value, ResourceTypes type) {
+      Long value, ResourceTypes type, Long minimumAllocation,
+      Long maximumAllocation) {
     ResourceInformation ret = new ResourceInformation();
     ret.setName(name);
     ret.setResourceType(type);
     ret.setUnits(units);
     ret.setValue(value);
+    ret.setMinimumAllocation(minimumAllocation);
+    ret.setMaximumAllocation(maximumAllocation);
     return ret;
   }
 
   public static ResourceInformation newInstance(String name, String units,
       Long value) {
     return ResourceInformation
-        .newInstance(name, units, value, ResourceTypes.COUNTABLE);
+        .newInstance(name, units, value, ResourceTypes.COUNTABLE, 0L,
+            Long.MAX_VALUE);
   }
 
   public static ResourceInformation newInstance(String name, String units) {
     return ResourceInformation
-        .newInstance(name, units, 0L, ResourceTypes.COUNTABLE);
+        .newInstance(name, units, 0L, ResourceTypes.COUNTABLE, 0L,
+            Long.MAX_VALUE);
   }
 
   public static ResourceInformation newInstance(String name, Long value) {
     return ResourceInformation
-        .newInstance(name, "", value, ResourceTypes.COUNTABLE);
+        .newInstance(name, "", value, ResourceTypes.COUNTABLE, 0L,
+            Long.MAX_VALUE);
   }
 
   public static ResourceInformation newInstance(String name) {
@@ -165,7 +211,8 @@ public class ResourceInformation implements Comparable<ResourceInformation> {
   @Override
   public String toString() {
     return "name: " + this.name + ", units: " + this.units + ", type: "
-        + resourceType + ", value: " + value;
+        + resourceType + ", value: " + value + ", minimum allocation: "
+        + minimumAllocation + ", maximum allocation: " + maximumAllocation;
   }
 
   public String getShorthandRepresentation() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa982d22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
index 50a7619..ca495d0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
@@ -86,8 +86,18 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
   @Public
   @Evolving
   public static ResourceRequest newInstance(Priority priority, String hostName,
-      Resource capability, int numContainers, boolean relaxLocality, String
-      labelExpression, ExecutionTypeRequest executionTypeRequest) {
+      Resource capability, int numContainers, boolean relaxLocality,
+      String labelExpression, ExecutionTypeRequest executionTypeRequest) {
+    return newInstance(priority, hostName, capability, numContainers,
+        relaxLocality, labelExpression, executionTypeRequest, null);
+  }
+
+  @Public
+  @Evolving
+  public static ResourceRequest newInstance(Priority priority, String hostName,
+      Resource capability, int numContainers, boolean relaxLocality,
+      String labelExpression, ExecutionTypeRequest executionTypeRequest,
+      ProfileCapability profile) {
     ResourceRequest request = Records.newRecord(ResourceRequest.class);
     request.setPriority(priority);
     request.setResourceName(hostName);
@@ -96,6 +106,7 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
     request.setRelaxLocality(relaxLocality);
     request.setNodeLabelExpression(labelExpression);
     request.setExecutionTypeRequest(executionTypeRequest);
+    request.setProfileCapability(profile);
     return request;
   }
 
@@ -313,6 +324,14 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
   @Evolving
   public abstract void setNodeLabelExpression(String nodelabelExpression);
 
+  @Public
+  @Evolving
+  public abstract ProfileCapability getProfileCapability();
+
+  @Public
+  @Evolving
+  public abstract void setProfileCapability(ProfileCapability p);
+
   /**
    * Get the optional <em>ID</em> corresponding to this allocation request. This
    * ID is an identifier for different {@code ResourceRequest}s from the <b>same
@@ -372,11 +391,13 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
     Resource capability = getCapability();
     String hostName = getResourceName();
     Priority priority = getPriority();
+    ProfileCapability profile = getProfileCapability();
     result =
         prime * result + ((capability == null) ? 0 : capability.hashCode());
     result = prime * result + ((hostName == null) ? 0 : hostName.hashCode());
     result = prime * result + getNumContainers();
     result = prime * result + ((priority == null) ? 0 : priority.hashCode());
+    result = prime * result + ((profile == null) ? 0 : profile.hashCode());
     return result;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa982d22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
index 668c50d..7011cdb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
@@ -145,6 +145,7 @@
             <exclude>src/test/resources/application_1440536969523_0001.har/part-0</exclude>
             <exclude>src/test/resources/application_1440536969523_0001.har/_masterindex</exclude>
             <exclude>src/test/resources/application_1440536969523_0001.har/_SUCCESS</exclude>
+            <exclude>src/test/resources/resource-profiles.json</exclude>
           </excludes>
         </configuration>
       </plugin>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa982d22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
index 7acaf11..e9d3755 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -113,6 +114,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
     final boolean relaxLocality;
     final String nodeLabelsExpression;
     final ExecutionTypeRequest executionTypeRequest;
+    final String resourceProfile;
     
     /**
      * Instantiates a {@link ContainerRequest} with the given constraints and
@@ -134,6 +136,11 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
         String[] racks, Priority priority) {
       this(capability, nodes, racks, priority, true, null);
     }
+
+    public ContainerRequest(ProfileCapability capability, String[] nodes,
+        String[] racks, Priority priority) {
+      this(capability, nodes, racks, priority, true, null);
+    }
     
     /**
      * Instantiates a {@link ContainerRequest} with the given constraints.
@@ -158,6 +165,11 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
       this(capability, nodes, racks, priority, relaxLocality, null);
     }
 
+    public ContainerRequest(ProfileCapability capability, String[] nodes,
+        String[] racks, Priority priority, boolean relaxLocality) {
+      this(capability, nodes, racks, priority, relaxLocality, null);
+    }
+
     /**
      * Instantiates a {@link ContainerRequest} with the given constraints.
      *
@@ -185,6 +197,53 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
           nodeLabelsExpression,
           ExecutionTypeRequest.newInstance());
     }
+
+    public ContainerRequest(ProfileCapability capability, String[] nodes, String[] racks,
+        Priority priority, boolean relaxLocality, String nodeLabelsExpression) {
+      this(capability, nodes, racks, priority, relaxLocality,
+          nodeLabelsExpression,
+          ExecutionTypeRequest.newInstance());
+    }
+
+    /**
+     * Instantiates a {@link ContainerRequest} with the given constraints.
+     *
+     * @param capability
+     *          The {@link Resource} to be requested for each container.
+     * @param nodes
+     *          Any hosts to request that the containers are placed on.
+     * @param racks
+     *          Any racks to request that the containers are placed on. The
+     *          racks corresponding to any hosts requested will be automatically
+     *          added to this list.
+     * @param priority
+     *          The priority at which to request the containers. Higher
+     *          priorities have lower numerical values.
+     * @param relaxLocality
+     *          If true, containers for this request may be assigned on hosts
+     *          and racks other than the ones explicitly requested.
+     * @param nodeLabelsExpression
+     *          Set node labels to allocate resource, now we only support
+     *          asking for only a single node label
+     * @param executionTypeRequest
+     *          Set the execution type of the container request.
+     */
+    public ContainerRequest(Resource capability, String[] nodes, String[] racks,
+        Priority priority, boolean relaxLocality, String nodeLabelsExpression,
+        ExecutionTypeRequest executionTypeRequest) {
+      this(capability, nodes, racks, priority, relaxLocality,
+          nodeLabelsExpression, executionTypeRequest,
+          ProfileCapability.DEFAULT_PROFILE);
+    }
+
+    public ContainerRequest(ProfileCapability capability, String[] nodes,
+        String[] racks, Priority priority, boolean relaxLocality,
+        String nodeLabelsExpression,
+        ExecutionTypeRequest executionTypeRequest) {
+      this(capability.getProfileCapabilityOverride(), nodes, racks, priority,
+          relaxLocality, nodeLabelsExpression, executionTypeRequest,
+          capability.getProfileName());
+    }
           
     /**
      * Instantiates a {@link ContainerRequest} with the given constraints.
@@ -208,10 +267,12 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
      *          asking for only a single node label
      * @param executionTypeRequest
      *          Set the execution type of the container request.
+     * @param profile
+     *          Set the resource profile for the container request
      */
     public ContainerRequest(Resource capability, String[] nodes, String[] racks,
         Priority priority, boolean relaxLocality, String nodeLabelsExpression,
-        ExecutionTypeRequest executionTypeRequest) {
+        ExecutionTypeRequest executionTypeRequest, String profile) {
       // Validate request
       Preconditions.checkArgument(capability != null,
           "The Resource to be requested for each container " +
@@ -230,6 +291,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
       this.relaxLocality = relaxLocality;
       this.nodeLabelsExpression = nodeLabelsExpression;
       this.executionTypeRequest = executionTypeRequest;
+      this.resourceProfile = profile;
     }
     
     public Resource getCapability() {
@@ -260,12 +322,17 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
       return executionTypeRequest;
     }
 
+    public String getResourceProfile() {
+      return resourceProfile;
+    }
+
     public String toString() {
       StringBuilder sb = new StringBuilder();
       sb.append("Capability[").append(capability).append("]");
       sb.append("Priority[").append(priority).append("]");
       sb.append("ExecutionTypeRequest[").append(executionTypeRequest)
           .append("]");
+      sb.append("Resource Profile[").append(resourceProfile).append("]");
       return sb.toString();
     }
   }
@@ -421,6 +488,14 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
     throw new UnsupportedOperationException("The sub-class extending" +
         " AMRMClient is expected to implement this !!");
   }
+
+  @InterfaceStability.Evolving
+  public List<? extends Collection<T>> getMatchingRequests(
+      Priority priority, String resourceName, ExecutionType executionType,
+      ProfileCapability capability) {
+    throw new UnsupportedOperationException("The sub-class extending" +
+        " AMRMClient is expected to implement this !!");
+  }
   
   /**
    * Update application's blacklist with addition or removal resources.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa982d22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
index 4145944..1e3bad2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -103,55 +104,55 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   protected final Set<String> blacklistedNodes = new HashSet<String>();
   protected final Set<String> blacklistAdditions = new HashSet<String>();
   protected final Set<String> blacklistRemovals = new HashSet<String>();
+
+  protected Map<String, Resource> resourceProfilesMap;
   
   static class ResourceRequestInfo<T> {
     ResourceRequest remoteRequest;
     LinkedHashSet<T> containerRequests;
-    
+
     ResourceRequestInfo(Priority priority, String resourceName,
-        Resource capability, boolean relaxLocality) {
+        Resource capability, boolean relaxLocality, String resourceProfile) {
       remoteRequest = ResourceRequest.newInstance(priority, resourceName,
           capability, 0);
       remoteRequest.setRelaxLocality(relaxLocality);
+      ProfileCapability profileCapability = ProfileCapability
+          .newInstance(resourceProfile, capability);
+      remoteRequest.setProfileCapability(profileCapability);
       containerRequests = new LinkedHashSet<T>();
     }
   }
 
   /**
-   * Class compares Resource by memory then cpu in reverse order
+   * Class compares Resource by memory, then cpu and then the remaining resource
+   * types in reverse order.
    */
-  static class ResourceReverseMemoryThenCpuComparator implements
-      Comparator<Resource>, Serializable {
-    static final long serialVersionUID = 12345L;
-    @Override
-    public int compare(Resource arg0, Resource arg1) {
-      long mem0 = arg0.getMemorySize();
-      long mem1 = arg1.getMemorySize();
-      long cpu0 = arg0.getVirtualCores();
-      long cpu1 = arg1.getVirtualCores();
-      if(mem0 == mem1) {
-        if(cpu0 == cpu1) {
-          return 0;
-        }
-        if(cpu0 < cpu1) {
-          return 1;
-        }
-        return -1;
-      }
-      if(mem0 < mem1) { 
-        return 1;
-      }
-      return -1;
-    }    
+  static class ProfileCapabilityComparator<T extends ProfileCapability>
+      implements Comparator<T> {
+
+    HashMap<String, Resource> resourceProfilesMap;
+
+    public ProfileCapabilityComparator(
+        HashMap<String, Resource> resourceProfileMap) {
+      this.resourceProfilesMap = resourceProfileMap;
+    }
+
+    public int compare(T arg0, T arg1) {
+      Resource resource0 =
+          ProfileCapability.toResource(arg0, resourceProfilesMap);
+      Resource resource1 =
+          ProfileCapability.toResource(arg1, resourceProfilesMap);
+      return resource1.compareTo(resource0);
+    }
   }
 
-  static boolean canFit(Resource arg0, Resource arg1) {
-    long mem0 = arg0.getMemorySize();
-    long mem1 = arg1.getMemorySize();
-    long cpu0 = arg0.getVirtualCores();
-    long cpu1 = arg1.getVirtualCores();
-    
-    return (mem0 <= mem1 && cpu0 <= cpu1);
+  boolean canFit(ProfileCapability arg0, ProfileCapability arg1) {
+    Resource resource0 =
+        ProfileCapability.toResource(arg0, resourceProfilesMap);
+    Resource resource1 =
+        ProfileCapability.toResource(arg1, resourceProfilesMap);
+    return Resources.fitsIn(resource0, resource1);
+
   }
 
   final RemoteRequestsTable remoteRequestsTable = new RemoteRequestsTable<T>();
@@ -228,6 +229,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     return registerApplicationMaster();
   }
 
+  @SuppressWarnings("unchecked")
   private RegisterApplicationMasterResponse registerApplicationMaster()
       throws YarnException, IOException {
     RegisterApplicationMasterRequest request =
@@ -240,6 +242,13 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
       if (!response.getNMTokensFromPreviousAttempts().isEmpty()) {
         populateNMTokens(response.getNMTokensFromPreviousAttempts());
       }
+      this.resourceProfilesMap = response.getResourceProfiles();
+      if (this.resourceProfilesMap instanceof HashMap) {
+        this.remoteRequestsTable.setResourceComparator(
+            new ProfileCapabilityComparator((HashMap) resourceProfilesMap));
+      } else {
+        throw new IOException("Unable to set resource comparator for TreeMap");
+      }
     }
     return response;
   }
@@ -266,7 +275,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
           askList.add(ResourceRequest.newInstance(r.getPriority(),
               r.getResourceName(), r.getCapability(), r.getNumContainers(),
               r.getRelaxLocality(), r.getNodeLabelExpression(),
-              r.getExecutionTypeRequest()));
+              r.getExecutionTypeRequest(),r.getProfileCapability()));
         }
         List<ContainerResourceChangeRequest> increaseList = new ArrayList<>();
         List<ContainerResourceChangeRequest> decreaseList = new ArrayList<>();
@@ -484,6 +493,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   public synchronized void addContainerRequest(T req) {
     Preconditions.checkArgument(req != null,
         "Resource request can not be null.");
+    ProfileCapability profileCapability = ProfileCapability
+        .newInstance(req.getResourceProfile(), req.getCapability());
     Set<String> dedupedRacks = new HashSet<String>();
     if (req.getRacks() != null) {
       dedupedRacks.addAll(req.getRacks());
@@ -496,6 +507,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     Set<String> inferredRacks = resolveRacks(req.getNodes());
     inferredRacks.removeAll(dedupedRacks);
 
+    checkResourceProfile(req.getResourceProfile());
+
     // check that specific and non-specific requests cannot be mixed within a
     // priority
     checkLocalityRelaxationConflict(req.getPriority(), ANY_LIST,
@@ -519,26 +532,26 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
       }
       for (String node : dedupedNodes) {
         addResourceRequest(req.getPriority(), node,
-            req.getExecutionTypeRequest(), req.getCapability(), req, true,
+            req.getExecutionTypeRequest(), profileCapability, req, true,
             req.getNodeLabelExpression());
       }
     }
 
     for (String rack : dedupedRacks) {
       addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(),
-          req.getCapability(), req, true, req.getNodeLabelExpression());
+          profileCapability, req, true, req.getNodeLabelExpression());
     }
 
     // Ensure node requests are accompanied by requests for
     // corresponding rack
     for (String rack : inferredRacks) {
       addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(),
-          req.getCapability(), req, req.getRelaxLocality(),
+          profileCapability, req, req.getRelaxLocality(),
           req.getNodeLabelExpression());
     }
     // Off-switch
     addResourceRequest(req.getPriority(), ResourceRequest.ANY,
-        req.getExecutionTypeRequest(), req.getCapability(), req,
+        req.getExecutionTypeRequest(), profileCapability, req,
         req.getRelaxLocality(), req.getNodeLabelExpression());
   }
 
@@ -546,6 +559,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   public synchronized void removeContainerRequest(T req) {
     Preconditions.checkArgument(req != null,
         "Resource request can not be null.");
+    ProfileCapability profileCapability = ProfileCapability
+        .newInstance(req.getResourceProfile(), req.getCapability());
     Set<String> allRacks = new HashSet<String>();
     if (req.getRacks() != null) {
       allRacks.addAll(req.getRacks());
@@ -556,17 +571,17 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     if (req.getNodes() != null) {
       for (String node : new HashSet<String>(req.getNodes())) {
         decResourceRequest(req.getPriority(), node,
-            req.getExecutionTypeRequest(), req.getCapability(), req);
+            req.getExecutionTypeRequest(), profileCapability, req);
       }
     }
 
     for (String rack : allRacks) {
       decResourceRequest(req.getPriority(), rack,
-          req.getExecutionTypeRequest(), req.getCapability(), req);
+          req.getExecutionTypeRequest(), profileCapability, req);
     }
 
     decResourceRequest(req.getPriority(), ResourceRequest.ANY,
-        req.getExecutionTypeRequest(), req.getCapability(), req);
+        req.getExecutionTypeRequest(), profileCapability, req);
   }
 
   @Override
@@ -620,26 +635,36 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   public synchronized List<? extends Collection<T>> getMatchingRequests(
       Priority priority, String resourceName, ExecutionType executionType,
       Resource capability) {
+    ProfileCapability profileCapability =
+        ProfileCapability.newInstance(capability);
+    return getMatchingRequests(priority, resourceName, executionType,
+        profileCapability);
+  }
+
+  @Override
+  public synchronized List<? extends Collection<T>> getMatchingRequests(
+      Priority priority, String resourceName, ExecutionType executionType,
+      ProfileCapability capability) {
     Preconditions.checkArgument(capability != null,
         "The Resource to be requested should not be null ");
     Preconditions.checkArgument(priority != null,
         "The priority at which to request containers should not be null ");
     List<LinkedHashSet<T>> list = new LinkedList<LinkedHashSet<T>>();
-
     @SuppressWarnings("unchecked")
     List<ResourceRequestInfo<T>> matchingRequests =
         this.remoteRequestsTable.getMatchingRequests(priority, resourceName,
             executionType, capability);
+
     // If no exact match. Container may be larger than what was requested.
     // get all resources <= capability. map is reverse sorted.
     for (ResourceRequestInfo<T> resReqInfo : matchingRequests) {
-      if (canFit(resReqInfo.remoteRequest.getCapability(), capability) &&
-        !resReqInfo.containerRequests.isEmpty()) {
+      if (canFit(resReqInfo.remoteRequest.getProfileCapability(), capability) &&
+          !resReqInfo.containerRequests.isEmpty()) {
         list.add(resReqInfo.containerRequests);
       }
     }
     // no match found
-    return list;          
+    return list;
   }
   
   private Set<String> resolveRacks(List<String> nodes) {
@@ -684,6 +709,15 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
       }
     }
   }
+
+  private void checkResourceProfile(String profile) {
+    if (resourceProfilesMap != null && !resourceProfilesMap.isEmpty()
+        && !resourceProfilesMap.containsKey(profile)) {
+      throw new InvalidContainerRequestException(
+          "Invalid profile name, valid profile names are " + resourceProfilesMap
+              .keySet());
+    }
+  }
   
   /**
    * Valid if a node label expression specified on container request is valid or
@@ -740,17 +774,18 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   }
 
   private void addResourceRequest(Priority priority, String resourceName,
-      ExecutionTypeRequest execTypeReq, Resource capability, T req,
+      ExecutionTypeRequest execTypeReq, ProfileCapability capability, T req,
       boolean relaxLocality, String labelExpression) {
     @SuppressWarnings("unchecked")
     ResourceRequestInfo resourceRequestInfo = remoteRequestsTable
-        .addResourceRequest(priority, resourceName,
-        execTypeReq, capability, req, relaxLocality, labelExpression);
+        .addResourceRequest(priority, resourceName, execTypeReq, capability,
+            req, relaxLocality, labelExpression);
 
     // Note this down for next interaction with ResourceManager
     addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
 
     if (LOG.isDebugEnabled()) {
+      LOG.debug("Adding request to ask " + resourceRequestInfo.remoteRequest);
       LOG.debug("addResourceRequest:" + " applicationId="
           + " priority=" + priority.getPriority()
           + " resourceName=" + resourceName + " numContainers="
@@ -760,7 +795,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   }
 
   private void decResourceRequest(Priority priority, String resourceName,
-      ExecutionTypeRequest execTypeReq, Resource capability, T req) {
+      ExecutionTypeRequest execTypeReq, ProfileCapability capability, T req) {
     @SuppressWarnings("unchecked")
     ResourceRequestInfo resourceRequestInfo =
         remoteRequestsTable.decResourceRequest(priority, resourceName,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa982d22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java
index 853a512..9fc99be 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java
@@ -23,7 +23,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
 
 import java.util.Collection;
 import java.util.HashMap;
@@ -35,43 +35,42 @@ import java.util.TreeMap;
 
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ResourceRequestInfo;
-import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ResourceReverseMemoryThenCpuComparator;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ProfileCapabilityComparator;
 
 class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
 
   private static final Log LOG = LogFactory.getLog(RemoteRequestsTable.class);
 
-  static ResourceReverseMemoryThenCpuComparator resourceComparator =
-      new ResourceReverseMemoryThenCpuComparator();
+  private ProfileCapabilityComparator resourceComparator;
 
   /**
    * Nested Iterator that iterates over just the ResourceRequestInfo
    * object.
    */
   class RequestInfoIterator implements Iterator<ResourceRequestInfo> {
-    private Iterator<Map<String, Map<ExecutionType, TreeMap<Resource,
+    private Iterator<Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
         ResourceRequestInfo>>>> iLocMap;
-    private Iterator<Map<ExecutionType, TreeMap<Resource,
+    private Iterator<Map<ExecutionType, TreeMap<ProfileCapability,
         ResourceRequestInfo>>> iExecTypeMap;
-    private Iterator<TreeMap<Resource, ResourceRequestInfo>> iCapMap;
+    private Iterator<TreeMap<ProfileCapability, ResourceRequestInfo>> iCapMap;
     private Iterator<ResourceRequestInfo> iResReqInfo;
 
     public RequestInfoIterator(Iterator<Map<String,
-        Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>>>
+        Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>>>
         iLocationMap) {
       this.iLocMap = iLocationMap;
       if (iLocMap.hasNext()) {
         iExecTypeMap = iLocMap.next().values().iterator();
       } else {
         iExecTypeMap =
-            new LinkedList<Map<ExecutionType, TreeMap<Resource,
+            new LinkedList<Map<ExecutionType, TreeMap<ProfileCapability,
                 ResourceRequestInfo>>>().iterator();
       }
       if (iExecTypeMap.hasNext()) {
         iCapMap = iExecTypeMap.next().values().iterator();
       } else {
         iCapMap =
-            new LinkedList<TreeMap<Resource, ResourceRequestInfo>>()
+            new LinkedList<TreeMap<ProfileCapability, ResourceRequestInfo>>()
                 .iterator();
       }
       if (iCapMap.hasNext()) {
@@ -113,7 +112,7 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
   // Nest map with Primary key :
   // Priority -> ResourceName(String) -> ExecutionType -> Capability(Resource)
   // and value : ResourceRequestInfo
-  private Map<Priority, Map<String, Map<ExecutionType, TreeMap<Resource,
+  private Map<Priority, Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
       ResourceRequestInfo>>>> remoteRequestsTable = new HashMap<>();
 
   @Override
@@ -122,8 +121,8 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
   }
 
   ResourceRequestInfo get(Priority priority, String location,
-      ExecutionType execType, Resource capability) {
-    TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+      ExecutionType execType, ProfileCapability capability) {
+    TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap =
         getCapabilityMap(priority, location, execType);
     if (capabilityMap == null) {
       return null;
@@ -131,9 +130,10 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
     return capabilityMap.get(capability);
   }
 
+  @SuppressWarnings("unchecked")
   void put(Priority priority, String resourceName, ExecutionType execType,
-      Resource capability, ResourceRequestInfo resReqInfo) {
-    Map<String, Map<ExecutionType, TreeMap<Resource,
+      ProfileCapability capability, ResourceRequestInfo resReqInfo) {
+    Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
         ResourceRequestInfo>>> locationMap =
         remoteRequestsTable.get(priority);
     if (locationMap == null) {
@@ -143,8 +143,8 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
         LOG.debug("Added priority=" + priority);
       }
     }
-    Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>> execTypeMap =
-        locationMap.get(resourceName);
+    Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>
+        execTypeMap = locationMap.get(resourceName);
     if (execTypeMap == null) {
       execTypeMap = new HashMap<>();
       locationMap.put(resourceName, execTypeMap);
@@ -152,9 +152,14 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
         LOG.debug("Added resourceName=" + resourceName);
       }
     }
-    TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+    TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap =
         execTypeMap.get(execType);
     if (capabilityMap == null) {
+      // this can happen if the user doesn't register with the RM before
+      // calling addResourceRequest
+      if (resourceComparator == null) {
+        resourceComparator = new ProfileCapabilityComparator(new HashMap<>());
+      }
       capabilityMap = new TreeMap<>(resourceComparator);
       execTypeMap.put(execType, capabilityMap);
       if (LOG.isDebugEnabled()) {
@@ -165,9 +170,9 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
   }
 
   ResourceRequestInfo remove(Priority priority, String resourceName,
-      ExecutionType execType, Resource capability) {
+      ExecutionType execType, ProfileCapability capability) {
     ResourceRequestInfo retVal = null;
-    Map<String, Map<ExecutionType, TreeMap<Resource,
+    Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
         ResourceRequestInfo>>> locationMap = remoteRequestsTable.get(priority);
     if (locationMap == null) {
       if (LOG.isDebugEnabled()) {
@@ -175,7 +180,7 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
       }
       return null;
     }
-    Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>
+    Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>
         execTypeMap = locationMap.get(resourceName);
     if (execTypeMap == null) {
       if (LOG.isDebugEnabled()) {
@@ -183,7 +188,7 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
       }
       return null;
     }
-    TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+    TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap =
         execTypeMap.get(execType);
     if (capabilityMap == null) {
       if (LOG.isDebugEnabled()) {
@@ -204,14 +209,14 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
     return retVal;
   }
 
-  Map<String, Map<ExecutionType, TreeMap<Resource,
+  Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
       ResourceRequestInfo>>> getLocationMap(Priority priority) {
     return remoteRequestsTable.get(priority);
   }
 
-  Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>
+  Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>
       getExecutionTypeMap(Priority priority, String location) {
-    Map<String, Map<ExecutionType, TreeMap<Resource,
+    Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
         ResourceRequestInfo>>> locationMap = getLocationMap(priority);
     if (locationMap == null) {
       return null;
@@ -219,10 +224,10 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
     return locationMap.get(location);
   }
 
-  TreeMap<Resource, ResourceRequestInfo> getCapabilityMap(Priority
+  TreeMap<ProfileCapability, ResourceRequestInfo> getCapabilityMap(Priority
       priority, String location,
       ExecutionType execType) {
-    Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>
+    Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>
         executionTypeMap = getExecutionTypeMap(priority, location);
     if (executionTypeMap == null) {
       return null;
@@ -236,7 +241,7 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
     List retList = new LinkedList<>();
     for (String location : locations) {
       for (ExecutionType eType : ExecutionType.values()) {
-        TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+        TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap =
             getCapabilityMap(priority, location, eType);
         if (capabilityMap != null) {
           retList.addAll(capabilityMap.values());
@@ -248,9 +253,9 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
 
   List<ResourceRequestInfo> getMatchingRequests(
       Priority priority, String resourceName, ExecutionType executionType,
-      Resource capability) {
+      ProfileCapability capability) {
     List<ResourceRequestInfo> list = new LinkedList<>();
-    TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+    TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap =
         getCapabilityMap(priority, resourceName, executionType);
     if (capabilityMap != null) {
       ResourceRequestInfo resourceRequestInfo = capabilityMap.get(capability);
@@ -265,14 +270,14 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
 
   @SuppressWarnings("unchecked")
   ResourceRequestInfo addResourceRequest(Priority priority, String resourceName,
-      ExecutionTypeRequest execTypeReq, Resource capability, T req,
+      ExecutionTypeRequest execTypeReq, ProfileCapability capability, T req,
       boolean relaxLocality, String labelExpression) {
     ResourceRequestInfo resourceRequestInfo = get(priority, resourceName,
         execTypeReq.getExecutionType(), capability);
     if (resourceRequestInfo == null) {
-      resourceRequestInfo =
-          new ResourceRequestInfo(priority, resourceName, capability,
-              relaxLocality);
+      resourceRequestInfo = new ResourceRequestInfo(priority, resourceName,
+          capability.getProfileCapabilityOverride(), relaxLocality,
+          capability.getProfileName());
       put(priority, resourceName, execTypeReq.getExecutionType(), capability,
           resourceRequestInfo);
     }
@@ -287,11 +292,14 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
     if (ResourceRequest.ANY.equals(resourceName)) {
       resourceRequestInfo.remoteRequest.setNodeLabelExpression(labelExpression);
     }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Adding request to ask " + resourceRequestInfo.remoteRequest);
+    }
     return resourceRequestInfo;
   }
 
   ResourceRequestInfo decResourceRequest(Priority priority, String resourceName,
-      ExecutionTypeRequest execTypeReq, Resource capability, T req) {
+      ExecutionTypeRequest execTypeReq, ProfileCapability capability, T req) {
     ResourceRequestInfo resourceRequestInfo = get(priority, resourceName,
         execTypeReq.getExecutionType(), capability);
 
@@ -329,4 +337,34 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
     return remoteRequestsTable.isEmpty();
   }
 
+  @SuppressWarnings("unchecked")
+  public void setResourceComparator(ProfileCapabilityComparator comparator) {
+    ProfileCapabilityComparator old = this.resourceComparator;
+    this.resourceComparator = comparator;
+    if (old != null) {
+      // we've already set a resource comparator - re-create the maps with the
+      // new one. this is needed in case someone adds container requests before
+      // registering with the RM. In such a case, the comparator won't have
+      // the resource profiles map. After registration, the map is available
+      // so re-create the capabilities maps
+
+      for (Map.Entry<Priority, Map<String, Map<ExecutionType,
+          TreeMap<ProfileCapability, ResourceRequestInfo>>>>
+          priEntry : remoteRequestsTable.entrySet()) {
+        for (Map.Entry<String, Map<ExecutionType, TreeMap<ProfileCapability,
+            ResourceRequestInfo>>> nameEntry : priEntry.getValue().entrySet()) {
+          for (Map.Entry<ExecutionType, TreeMap<ProfileCapability,
+              ResourceRequestInfo>> execEntry : nameEntry
+              .getValue().entrySet()) {
+            Map<ProfileCapability, ResourceRequestInfo> capabilityMap =
+                execEntry.getValue();
+            TreeMap<ProfileCapability, ResourceRequestInfo> newCapabiltyMap =
+                new TreeMap<>(resourceComparator);
+            newCapabiltyMap.putAll(capabilityMap);
+            execEntry.setValue(newCapabiltyMap);
+          }
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa982d22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
index 99bfca5..1d1dcdc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.Token;
@@ -133,7 +134,12 @@ public class TestAMRMClient {
     // set the minimum allocation so that resource decrease can go under 1024
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
     conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
-    yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
+    startYARNCluster();
+  }
+
+  private static void startYARNCluster() throws Exception {
+    yarnCluster =
+        new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
     yarnCluster.init(conf);
     yarnCluster.start();
 
@@ -144,15 +150,15 @@ public class TestAMRMClient {
 
     // get node info
     nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
-    
+
     priority = Priority.newInstance(1);
     priority2 = Priority.newInstance(2);
     capability = Resource.newInstance(1024, 1);
 
     node = nodeReports.get(0).getNodeId().getHost();
     rack = nodeReports.get(0).getRackName();
-    nodes = new String[]{ node };
-    racks = new String[]{ rack };
+    nodes = new String[] { node };
+    racks = new String[] { rack };
   }
   
   @Before
@@ -344,7 +350,8 @@ public class TestAMRMClient {
   }
   
   @Test (timeout=60000)
-  public void testAMRMClientMatchingFitInferredRack() throws YarnException, IOException {
+  public void testAMRMClientMatchingFitInferredRack()
+      throws YarnException, IOException {
     AMRMClientImpl<ContainerRequest> amClient = null;
     try {
       // start am rm client
@@ -352,10 +359,10 @@ public class TestAMRMClient {
       amClient.init(conf);
       amClient.start();
       amClient.registerApplicationMaster("Host", 10000, "");
-      
+
       Resource capability = Resource.newInstance(1024, 2);
 
-      ContainerRequest storedContainer1 = 
+      ContainerRequest storedContainer1 =
           new ContainerRequest(capability, nodes, null, priority);
       amClient.addContainerRequest(storedContainer1);
 
@@ -372,14 +379,15 @@ public class TestAMRMClient {
       verifyMatches(matches, 1);
       storedRequest = matches.get(0).iterator().next();
       assertEquals(storedContainer1, storedRequest);
-      
+
       // inferred rack match no longer valid after request is removed
       amClient.removeContainerRequest(storedContainer1);
       matches = amClient.getMatchingRequests(priority, rack, capability);
       assertTrue(matches.isEmpty());
-      
-      amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
-          null, null);
+
+      amClient
+          .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null,
+              null);
 
     } finally {
       if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
@@ -412,14 +420,17 @@ public class TestAMRMClient {
       amClient.addContainerRequest(storedContainer1);
       amClient.addContainerRequest(storedContainer2);
       amClient.addContainerRequest(storedContainer3);
+
+      ProfileCapability profileCapability =
+          ProfileCapability.newInstance(capability);
       
       // test addition and storage
       int containersRequestedAny = amClient.remoteRequestsTable.get(priority,
-          ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+          ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
           .remoteRequest.getNumContainers();
       assertEquals(2, containersRequestedAny);
       containersRequestedAny = amClient.remoteRequestsTable.get(priority1,
-          ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+          ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
           .remoteRequest.getNumContainers();
          assertEquals(1, containersRequestedAny);
       List<? extends Collection<ContainerRequest>> matches = 
@@ -921,15 +932,18 @@ public class TestAMRMClient {
         new ContainerRequest(capability, nodes, racks, priority));
     amClient.removeContainerRequest(
         new ContainerRequest(capability, nodes, racks, priority));
-    
+
+    ProfileCapability profileCapability =
+        ProfileCapability.newInstance(capability);
+
     int containersRequestedNode = amClient.remoteRequestsTable.get(priority,
-        node, ExecutionType.GUARANTEED, capability).remoteRequest
+        node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
         .getNumContainers();
     int containersRequestedRack = amClient.remoteRequestsTable.get(priority,
-        rack, ExecutionType.GUARANTEED, capability).remoteRequest
+        rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
         .getNumContainers();
     int containersRequestedAny = amClient.remoteRequestsTable.get(priority,
-        ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+        ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
         .remoteRequest.getNumContainers();
 
     assertEquals(2, containersRequestedNode);
@@ -1253,4 +1267,108 @@ public class TestAMRMClient {
     }
     return result;
   }
+
+  @Test(timeout = 60000)
+  public void testGetMatchingFitWithProfiles() throws Exception {
+    cancelApp();
+    tearDown();
+    conf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, true);
+    startYARNCluster();
+    startApp();
+    AMRMClient<ContainerRequest> amClient = null;
+    try {
+      // start am rm client
+      amClient = AMRMClient.<ContainerRequest>createAMRMClient();
+      amClient.init(conf);
+      amClient.start();
+      amClient.registerApplicationMaster("Host", 10000, "");
+
+      ProfileCapability capability1 = ProfileCapability.newInstance("minimum");
+      ProfileCapability capability2 = ProfileCapability.newInstance("default");
+      ProfileCapability capability3 = ProfileCapability.newInstance("maximum");
+      ProfileCapability capability4 = ProfileCapability
+          .newInstance("minimum", Resource.newInstance(2048, 1));
+      ProfileCapability capability5 = ProfileCapability.newInstance("default");
+      ProfileCapability capability6 = ProfileCapability
+          .newInstance("default", Resource.newInstance(2048, 1));
+      // http has the same capabilities as default
+      ProfileCapability capability7 = ProfileCapability.newInstance("http");
+
+      ContainerRequest storedContainer1 =
+          new ContainerRequest(capability1, nodes, racks, priority);
+      ContainerRequest storedContainer2 =
+          new ContainerRequest(capability2, nodes, racks, priority);
+      ContainerRequest storedContainer3 =
+          new ContainerRequest(capability3, nodes, racks, priority);
+      ContainerRequest storedContainer4 =
+          new ContainerRequest(capability4, nodes, racks, priority);
+      ContainerRequest storedContainer5 =
+          new ContainerRequest(capability5, nodes, racks, priority2);
+      ContainerRequest storedContainer6 =
+          new ContainerRequest(capability6, nodes, racks, priority);
+      ContainerRequest storedContainer7 =
+          new ContainerRequest(capability7, nodes, racks, priority);
+
+
+      amClient.addContainerRequest(storedContainer1);
+      amClient.addContainerRequest(storedContainer2);
+      amClient.addContainerRequest(storedContainer3);
+      amClient.addContainerRequest(storedContainer4);
+      amClient.addContainerRequest(storedContainer5);
+      amClient.addContainerRequest(storedContainer6);
+      amClient.addContainerRequest(storedContainer7);
+
+      // test matching of containers
+      List<? extends Collection<ContainerRequest>> matches;
+      ContainerRequest storedRequest;
+      // exact match
+      ProfileCapability testCapability1 =
+          ProfileCapability.newInstance("minimum");
+      matches = amClient
+          .getMatchingRequests(priority, node, ExecutionType.GUARANTEED,
+              testCapability1);
+      verifyMatches(matches, 1);
+      storedRequest = matches.get(0).iterator().next();
+      assertEquals(storedContainer1, storedRequest);
+      amClient.removeContainerRequest(storedContainer1);
+
+      // exact matching with order maintained
+      // we should get back 3 matches - default + http because they have the
+      // same capability
+      ProfileCapability testCapability2 =
+          ProfileCapability.newInstance("default");
+      matches = amClient
+          .getMatchingRequests(priority, node, ExecutionType.GUARANTEED,
+              testCapability2);
+      verifyMatches(matches, 2);
+      // must be returned in the order they were made
+      int i = 0;
+      for (ContainerRequest storedRequest1 : matches.get(0)) {
+        switch(i) {
+        case 0:
+          assertEquals(storedContainer2, storedRequest1);
+          break;
+        case 1:
+          assertEquals(storedContainer7, storedRequest1);
+          break;
+        }
+        i++;
+      }
+      amClient.removeContainerRequest(storedContainer5);
+
+      // matching with larger container. all requests returned
+      Resource testCapability3 = Resource.newInstance(8192, 8);
+      matches = amClient
+          .getMatchingRequests(priority, node, testCapability3);
+      assertEquals(3, matches.size());
+
+      Resource testCapability4 = Resource.newInstance(2048, 1);
+      matches = amClient.getMatchingRequests(priority, node, testCapability4);
+      assertEquals(1, matches.size());
+    } finally {
+      if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+        amClient.stop();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa982d22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
index 2db33c1..c9b9db2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
@@ -274,8 +275,10 @@ public class TestAMRMClientContainerRequest {
       AMRMClientImpl<ContainerRequest> client, ContainerRequest request,
       String location, boolean expectedRelaxLocality,
       ExecutionType executionType) {
+    ProfileCapability profileCapability = ProfileCapability
+        .newInstance(request.getResourceProfile(), request.getCapability());
     ResourceRequest ask = client.remoteRequestsTable.get(request.getPriority(),
-        location, executionType, request.getCapability()).remoteRequest;
+        location, executionType, profileCapability).remoteRequest;
     assertEquals(location, ask.getResourceName());
     assertEquals(1, ask.getNumContainers());
     assertEquals(expectedRelaxLocality, ask.getRelaxLocality());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa982d22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
index c649071..87471cc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.Token;
@@ -383,18 +384,21 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
               ExecutionTypeRequest.newInstance(
                   ExecutionType.OPPORTUNISTIC, true)));
 
+      ProfileCapability profileCapability =
+          ProfileCapability.newInstance(capability);
+
       int containersRequestedNode = amClient.remoteRequestsTable.get(priority,
-          node, ExecutionType.GUARANTEED, capability).remoteRequest
+          node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
           .getNumContainers();
       int containersRequestedRack = amClient.remoteRequestsTable.get(priority,
-          rack, ExecutionType.GUARANTEED, capability).remoteRequest
+          rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
           .getNumContainers();
       int containersRequestedAny = amClient.remoteRequestsTable.get(priority,
-          ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+          ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
           .remoteRequest.getNumContainers();
       int oppContainersRequestedAny =
           amClient.remoteRequestsTable.get(priority2, ResourceRequest.ANY,
-              ExecutionType.OPPORTUNISTIC, capability).remoteRequest
+              ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
               .getNumContainers();
 
       assertEquals(2, containersRequestedNode);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa982d22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
index 969fb70..a4a59fa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -252,8 +253,10 @@ public class TestNMClient {
           racks, priority));
     }
 
+    ProfileCapability profileCapability =
+        ProfileCapability.newInstance(capability);
     int containersRequestedAny = rmClient.remoteRequestsTable.get(priority,
-        ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+        ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
         .remoteRequest.getNumContainers();
 
     // RM should allocate container within 2 calls to allocate()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa982d22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/resource-profiles.json
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/resource-profiles.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/resource-profiles.json
new file mode 100644
index 0000000..d0f3f72
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/resource-profiles.json
@@ -0,0 +1,18 @@
+{
+    "minimum": {
+        "memory-mb" : 1024,
+        "vcores" : 1
+    },
+    "default" : {
+        "memory-mb" : 2048,
+        "vcores" : 2
+    },
+    "maximum" : {
+        "memory-mb": 4096,
+        "vcores" : 4
+    },
+    "http" : {
+        "memory-mb" : 2048,
+        "vcores" : 2
+    }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa982d22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java
index a95aadf..626e05d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java
@@ -33,6 +33,8 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProfilesProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProfileEntry;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
@@ -59,6 +61,7 @@ public class RegisterApplicationMasterResponsePBImpl extends
   private List<Container> containersFromPreviousAttempts = null;
   private List<NMToken> nmTokens = null;
   private EnumSet<SchedulerResourceTypes> schedulerResourceTypes = null;
+  private Map<String, Resource> profiles = null;
 
   public RegisterApplicationMasterResponsePBImpl() {
     builder = RegisterApplicationMasterResponseProto.newBuilder();
@@ -123,6 +126,9 @@ public class RegisterApplicationMasterResponsePBImpl extends
     if(schedulerResourceTypes != null) {
       addSchedulerResourceTypes();
     }
+    if (profiles != null) {
+      addResourceProfiles();
+    }
   }
 
 
@@ -433,6 +439,58 @@ public class RegisterApplicationMasterResponsePBImpl extends
     this.schedulerResourceTypes.addAll(types);
   }
 
+  private void addResourceProfiles() {
+    maybeInitBuilder();
+    builder.clearResourceProfiles();
+    if (profiles == null) {
+      return;
+    }
+    ResourceProfilesProto.Builder profilesBuilder =
+        ResourceProfilesProto.newBuilder();
+    for (Map.Entry<String, Resource> entry : profiles.entrySet()) {
+      ResourceProfileEntry.Builder entryBuilder =
+          ResourceProfileEntry.newBuilder();
+      entryBuilder.setName(entry.getKey());
+      entryBuilder.setResources(convertToProtoFormat(entry.getValue()));
+      profilesBuilder.addResourceProfilesMap(entryBuilder.build());
+    }
+    builder.setResourceProfiles(profilesBuilder.build());
+  }
+
+  private void initResourceProfiles() {
+    if (this.profiles != null) {
+      return;
+    }
+    this.profiles = new HashMap<>();
+    RegisterApplicationMasterResponseProtoOrBuilder p =
+        viaProto ? proto : builder;
+
+    if (p.hasResourceProfiles()) {
+      ResourceProfilesProto profilesProto = p.getResourceProfiles();
+      for (ResourceProfileEntry entry : profilesProto
+          .getResourceProfilesMapList()) {
+        this.profiles
+            .put(entry.getName(), convertFromProtoFormat(entry.getResources()));
+      }
+    }
+  }
+
+  @Override
+  public Map<String, Resource> getResourceProfiles() {
+    initResourceProfiles();
+    return this.profiles;
+  }
+
+  @Override
+  public void setResourceProfiles(Map<String, Resource> profilesMap) {
+    if (profilesMap == null) {
+      return;
+    }
+    initResourceProfiles();
+    this.profiles.clear();
+    this.profiles.putAll(profilesMap);
+  }
+
   private Resource convertFromProtoFormat(ResourceProto resource) {
     return new ResourcePBImpl(resource);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa982d22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java
index 236a763..da07539 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java
@@ -130,8 +130,8 @@ public class ResourcePBImpl extends Resource {
               ResourceTypes.COUNTABLE;
       String units = entry.hasUnits() ? entry.getUnits() : "";
       Long value = entry.hasValue() ? entry.getValue() : 0L;
-      ResourceInformation ri =
-          ResourceInformation.newInstance(entry.getKey(), units, value, type);
+      ResourceInformation ri = ResourceInformation
+          .newInstance(entry.getKey(), units, value, type, 0L, Long.MAX_VALUE);
       if (resources.containsKey(ri.getName())) {
         resources.get(ri.getName()).setResourceType(ri.getResourceType());
         resources.get(ri.getName()).setUnits(ri.getUnits());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa982d22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
index 9890296..39d772a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
@@ -23,8 +23,10 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.proto.YarnProtos.ProfileCapabilityProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto;
@@ -40,6 +42,7 @@ public class ResourceRequestPBImpl extends  ResourceRequest {
   private Priority priority = null;
   private Resource capability = null;
   private ExecutionTypeRequest executionTypeRequest = null;
+  private ProfileCapability profile = null;
   
   
   public ResourceRequestPBImpl() {
@@ -52,7 +55,7 @@ public class ResourceRequestPBImpl extends  ResourceRequest {
   }
   
   public ResourceRequestProto getProto() {
-      mergeLocalToProto();
+    mergeLocalToProto();
     proto = viaProto ? proto : builder.build();
     viaProto = true;
     return proto;
@@ -69,6 +72,9 @@ public class ResourceRequestPBImpl extends  ResourceRequest {
       builder.setExecutionTypeRequest(
           ProtoUtils.convertToProtoFormat(this.executionTypeRequest));
     }
+    if (this.profile != null) {
+      builder.setProfile(converToProtoFormat(this.profile));
+    }
   }
 
   private void mergeLocalToProto() {
@@ -229,7 +235,8 @@ public class ResourceRequestPBImpl extends  ResourceRequest {
         + ", Location: " + getResourceName()
         + ", Relax Locality: " + getRelaxLocality()
         + ", Execution Type Request: " + getExecutionTypeRequest()
-        + ", Node Label Expression: " + getNodeLabelExpression() + "}";
+        + ", Node Label Expression: " + getNodeLabelExpression()
+        + ", Resource Profile: " + getProfileCapability() + "}";
   }
 
   @Override
@@ -250,4 +257,34 @@ public class ResourceRequestPBImpl extends  ResourceRequest {
     }
     builder.setNodeLabelExpression(nodeLabelExpression);
   }
+
+  @Override
+  public void setProfileCapability(ProfileCapability profileCapability) {
+    maybeInitBuilder();
+    if (profile == null) {
+      builder.clearProfile();
+    }
+    this.profile = profileCapability;
+  }
+
+  @Override
+  public ProfileCapability getProfileCapability() {
+    if (profile != null) {
+      return profile;
+    }
+    ResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasProfile()) {
+      return null;
+    }
+    return new ProfileCapabilityPBImpl(p.getProfile());
+  }
+
+  private ProfileCapabilityProto converToProtoFormat(
+      ProfileCapability profileCapability) {
+    ProfileCapabilityPBImpl tmp = new ProfileCapabilityPBImpl();
+    tmp.setProfileName(profileCapability.getProfileName());
+    tmp.setProfileCapabilityOverride(
+        profileCapability.getProfileCapabilityOverride());
+    return tmp.getProto();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message