Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3BFEF200B11 for ; Mon, 13 Jun 2016 11:27:37 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3AA98160A19; Mon, 13 Jun 2016 09:27:37 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 37A47160A3C for ; Mon, 13 Jun 2016 11:27:35 +0200 (CEST) Received: (qmail 15619 invoked by uid 500); 13 Jun 2016 09:27:24 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 11898 invoked by uid 99); 13 Jun 2016 09:27:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Jun 2016 09:27:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C7B98E108B; Mon, 13 Jun 2016 09:27:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vvasudev@apache.org To: common-commits@hadoop.apache.org Date: Mon, 13 Jun 2016 09:28:01 -0000 Message-Id: <690434cf00fb4d1d97510a15af4e8940@git.apache.org> In-Reply-To: <0fa20729022e4c55a0b081024e40b890@git.apache.org> References: <0fa20729022e4c55a0b081024e40b890@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [41/51] [abbrv] hadoop git commit: YARN-5124. Modify AMRMClient to set the ExecutionType in the ResourceRequest. (asuresh) archived-at: Mon, 13 Jun 2016 09:27:37 -0000 YARN-5124. Modify AMRMClient to set the ExecutionType in the ResourceRequest. (asuresh) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/51432779 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/51432779 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/51432779 Branch: refs/heads/YARN-3926 Commit: 51432779588fdd741b4840601f5db637ec783d92 Parents: 5279af7 Author: Arun Suresh Authored: Sun Jun 12 09:42:38 2016 -0700 Committer: Arun Suresh Committed: Sun Jun 12 09:42:38 2016 -0700 ---------------------------------------------------------------------- .../hadoop/yarn/client/api/AMRMClient.java | 43 +- .../yarn/client/api/async/AMRMClientAsync.java | 17 + .../yarn/client/api/impl/AMRMClientImpl.java | 294 +++------ .../client/api/impl/RemoteRequestsTable.java | 332 ++++++++++ .../client/api/impl/BaseAMRMProxyE2ETest.java | 197 ++++++ .../yarn/client/api/impl/TestAMRMClient.java | 26 +- .../impl/TestAMRMClientContainerRequest.java | 54 +- .../yarn/client/api/impl/TestAMRMProxy.java | 171 +---- .../api/impl/TestDistributedScheduling.java | 644 ++++++++++++------- .../yarn/client/api/impl/TestNMClient.java | 7 +- .../records/impl/pb/ResourceRequestPBImpl.java | 5 +- 11 files changed, 1204 insertions(+), 586 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index 3ec0899..5f362c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -109,7 +110,7 @@ public abstract class AMRMClient extends final Priority priority; final boolean relaxLocality; final String nodeLabelsExpression; - final ExecutionType executionType; + final ExecutionTypeRequest executionTypeRequest; /** * Instantiates a {@link ContainerRequest} with the given constraints and @@ -180,7 +181,7 @@ public abstract class AMRMClient extends Priority priority, boolean relaxLocality, String nodeLabelsExpression) { this(capability, nodes, racks, priority, relaxLocality, nodeLabelsExpression, - ExecutionType.GUARANTEED); + ExecutionTypeRequest.newInstance()); } /** @@ -203,12 +204,12 @@ public abstract class AMRMClient extends * @param nodeLabelsExpression * Set node labels to allocate resource, now we only support * asking for only a single node label - * @param executionType + * @param executionTypeRequest * Set the execution type of the container request. */ public ContainerRequest(Resource capability, String[] nodes, String[] racks, Priority priority, boolean relaxLocality, String nodeLabelsExpression, - ExecutionType executionType) { + ExecutionTypeRequest executionTypeRequest) { // Validate request Preconditions.checkArgument(capability != null, "The Resource to be requested for each container " + @@ -226,7 +227,7 @@ public abstract class AMRMClient extends this.priority = priority; this.relaxLocality = relaxLocality; this.nodeLabelsExpression = nodeLabelsExpression; - this.executionType = executionType; + this.executionTypeRequest = executionTypeRequest; } public Resource getCapability() { @@ -253,15 +254,16 @@ public abstract class AMRMClient extends return nodeLabelsExpression; } - public ExecutionType getExecutionType() { - return executionType; + public ExecutionTypeRequest getExecutionTypeRequest() { + return executionTypeRequest; } public String toString() { StringBuilder sb = new StringBuilder(); sb.append("Capability[").append(capability).append("]"); sb.append("Priority[").append(priority).append("]"); - sb.append("ExecutionType[").append(executionType).append("]"); + sb.append("ExecutionTypeRequest[").append(executionTypeRequest) + .append("]"); return sb.toString(); } } @@ -388,10 +390,35 @@ public abstract class AMRMClient extends * collection, requests will be returned in the same order as they were added. * @return Collection of request matching the parameters */ + @InterfaceStability.Evolving public abstract List> getMatchingRequests( Priority priority, String resourceName, Resource capability); + + /** + * Get outstanding ContainerRequests matching the given + * parameters. These ContainerRequests should have been added via + * addContainerRequest earlier in the lifecycle. For performance, + * the AMRMClient may return its internal collection directly without creating + * a copy. Users should not perform mutable operations on the return value. + * Each collection in the list contains requests with identical + * Resource size that fit in the given capability. In a + * collection, requests will be returned in the same order as they were added. + * specify an ExecutionType . + * @param priority Priority + * @param resourceName Location + * @param executionType ExecutionType + * @param capability Capability + * @return Collection of request matching the parameters + */ + @InterfaceStability.Evolving + public List> getMatchingRequests( + Priority priority, String resourceName, ExecutionType executionType, + Resource capability) { + throw new UnsupportedOperationException("The sub-class extending" + + " AMRMClient is expected to implement this !!"); + } /** * Update application's blacklist with addition or removal resources. http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java index 3c8f923..2f95156 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; @@ -196,6 +197,22 @@ extends AbstractService { Priority priority, String resourceName, Resource capability); + + /** + * Returns all matching ContainerRequests that match the given Priority, + * ResourceName, ExecutionType and Capability. + * @param priority Priority. + * @param resourceName Location. + * @param executionType ExecutionType. + * @param capability Capability. + * @return All matching ContainerRequests + */ + public List> getMatchingRequests( + Priority priority, String resourceName, ExecutionType executionType, + Resource capability) { + return client.getMatchingRequests(priority, resourceName, + executionType, capability); + } /** * Registers this application master with the resource manager. On successful http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 4366c25..4145944 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -19,19 +19,19 @@ package org.apache.hadoop.yarn.client.api.impl; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; import java.util.TreeSet; import java.util.AbstractMap.SimpleEntry; @@ -54,6 +54,8 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Priority; @@ -102,7 +104,7 @@ public class AMRMClientImpl extends AMRMClient { protected final Set blacklistAdditions = new HashSet(); protected final Set blacklistRemovals = new HashSet(); - class ResourceRequestInfo { + static class ResourceRequestInfo { ResourceRequest remoteRequest; LinkedHashSet containerRequests; @@ -115,11 +117,12 @@ public class AMRMClientImpl extends AMRMClient { } } - /** * Class compares Resource by memory then cpu in reverse order */ - class ResourceReverseMemoryThenCpuComparator implements Comparator { + static class ResourceReverseMemoryThenCpuComparator implements + Comparator, Serializable { + static final long serialVersionUID = 12345L; @Override public int compare(Resource arg0, Resource arg1) { long mem0 = arg0.getMemorySize(); @@ -141,7 +144,7 @@ public class AMRMClientImpl extends AMRMClient { return -1; } } - + static boolean canFit(Resource arg0, Resource arg1) { long mem0 = arg0.getMemorySize(); long mem1 = arg1.getMemorySize(); @@ -150,17 +153,8 @@ public class AMRMClientImpl extends AMRMClient { return (mem0 <= mem1 && cpu0 <= cpu1); } - - //Key -> Priority - //Value -> Map - //Key->ResourceName (e.g., nodename, rackname, *) - //Value->Map - //Key->Resource Capability - //Value->ResourceRequest - protected final - Map>> - remoteRequestsTable = - new TreeMap>>(); + + final RemoteRequestsTable remoteRequestsTable = new RemoteRequestsTable(); protected final Set ask = new TreeSet( new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator()); @@ -185,6 +179,12 @@ public class AMRMClientImpl extends AMRMClient { super(AMRMClientImpl.class.getName()); } + @VisibleForTesting + AMRMClientImpl(ApplicationMasterProtocol protocol) { + super(AMRMClientImpl.class.getName()); + this.rmClient = protocol; + } + @Override protected void serviceInit(Configuration conf) throws Exception { RackResolver.init(conf); @@ -195,8 +195,10 @@ public class AMRMClientImpl extends AMRMClient { protected void serviceStart() throws Exception { final YarnConfiguration conf = new YarnConfiguration(getConfig()); try { - rmClient = - ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class); + if (rmClient == null) { + rmClient = ClientRMProxy.createRMProxy( + conf, ApplicationMasterProtocol.class); + } } catch (IOException e) { throw new YarnRuntimeException(e); } @@ -263,7 +265,8 @@ public class AMRMClientImpl extends AMRMClient { // RPC layer is using it to send info across askList.add(ResourceRequest.newInstance(r.getPriority(), r.getResourceName(), r.getCapability(), r.getNumContainers(), - r.getRelaxLocality(), r.getNodeLabelExpression())); + r.getRelaxLocality(), r.getNodeLabelExpression(), + r.getExecutionTypeRequest())); } List increaseList = new ArrayList<>(); List decreaseList = new ArrayList<>(); @@ -315,13 +318,11 @@ public class AMRMClientImpl extends AMRMClient { synchronized (this) { release.addAll(this.pendingRelease); blacklistAdditions.addAll(this.blacklistedNodes); - for (Map> rr : remoteRequestsTable - .values()) { - for (Map capabalities : rr.values()) { - for (ResourceRequestInfo request : capabalities.values()) { - addResourceRequestToAsk(request.remoteRequest); - } - } + @SuppressWarnings("unchecked") + Iterator> reqIter = + remoteRequestsTable.iterator(); + while (reqIter.hasNext()) { + addResourceRequestToAsk(reqIter.next().remoteRequest); } change.putAll(this.pendingChange); } @@ -517,26 +518,28 @@ public class AMRMClientImpl extends AMRMClient { + joiner.join(req.getNodes())); } for (String node : dedupedNodes) { - addResourceRequest(req.getPriority(), node, req.getCapability(), req, - true, req.getNodeLabelExpression()); + addResourceRequest(req.getPriority(), node, + req.getExecutionTypeRequest(), req.getCapability(), req, true, + req.getNodeLabelExpression()); } } for (String rack : dedupedRacks) { - addResourceRequest(req.getPriority(), rack, req.getCapability(), req, - true, req.getNodeLabelExpression()); + addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(), + req.getCapability(), req, true, req.getNodeLabelExpression()); } // Ensure node requests are accompanied by requests for // corresponding rack for (String rack : inferredRacks) { - addResourceRequest(req.getPriority(), rack, req.getCapability(), req, - req.getRelaxLocality(), req.getNodeLabelExpression()); + addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(), + req.getCapability(), req, req.getRelaxLocality(), + req.getNodeLabelExpression()); } - // Off-switch - addResourceRequest(req.getPriority(), ResourceRequest.ANY, - req.getCapability(), req, req.getRelaxLocality(), req.getNodeLabelExpression()); + addResourceRequest(req.getPriority(), ResourceRequest.ANY, + req.getExecutionTypeRequest(), req.getCapability(), req, + req.getRelaxLocality(), req.getNodeLabelExpression()); } @Override @@ -552,16 +555,18 @@ public class AMRMClientImpl extends AMRMClient { // Update resource requests if (req.getNodes() != null) { for (String node : new HashSet(req.getNodes())) { - decResourceRequest(req.getPriority(), node, req.getCapability(), req); + decResourceRequest(req.getPriority(), node, + req.getExecutionTypeRequest(), req.getCapability(), req); } } for (String rack : allRacks) { - decResourceRequest(req.getPriority(), rack, req.getCapability(), req); + decResourceRequest(req.getPriority(), rack, + req.getExecutionTypeRequest(), req.getCapability(), req); } decResourceRequest(req.getPriority(), ResourceRequest.ANY, - req.getCapability(), req); + req.getExecutionTypeRequest(), req.getCapability(), req); } @Override @@ -601,47 +606,38 @@ public class AMRMClientImpl extends AMRMClient { public synchronized int getClusterNodeCount() { return clusterNodeCount; } - + + @Override + public synchronized List> getMatchingRequests( + Priority priority, + String resourceName, + Resource capability) { + return getMatchingRequests(priority, resourceName, + ExecutionType.GUARANTEED, capability); + } + @Override public synchronized List> getMatchingRequests( - Priority priority, - String resourceName, - Resource capability) { + Priority priority, String resourceName, ExecutionType executionType, + Resource capability) { Preconditions.checkArgument(capability != null, "The Resource to be requested should not be null "); Preconditions.checkArgument(priority != null, "The priority at which to request containers should not be null "); List> list = new LinkedList>(); - Map> remoteRequests = - this.remoteRequestsTable.get(priority); - if (remoteRequests == null) { - return list; - } - TreeMap reqMap = remoteRequests - .get(resourceName); - if (reqMap == null) { - return list; - } - ResourceRequestInfo resourceRequestInfo = reqMap.get(capability); - if (resourceRequestInfo != null && - !resourceRequestInfo.containerRequests.isEmpty()) { - list.add(resourceRequestInfo.containerRequests); - return list; - } - - // no exact match. Container may be larger than what was requested. - // get all resources <= capability. map is reverse sorted. - SortedMap tailMap = - reqMap.tailMap(capability); - for(Map.Entry entry : tailMap.entrySet()) { - if (canFit(entry.getKey(), capability) && - !entry.getValue().containerRequests.isEmpty()) { - // match found that fits in the larger resource - list.add(entry.getValue().containerRequests); + @SuppressWarnings("unchecked") + List> matchingRequests = + this.remoteRequestsTable.getMatchingRequests(priority, resourceName, + executionType, capability); + // If no exact match. Container may be larger than what was requested. + // get all resources <= capability. map is reverse sorted. + for (ResourceRequestInfo resReqInfo : matchingRequests) { + if (canFit(resReqInfo.remoteRequest.getCapability(), capability) && + !resReqInfo.containerRequests.isEmpty()) { + list.add(resReqInfo.containerRequests); } } - // no match found return list; } @@ -663,34 +659,30 @@ public class AMRMClientImpl extends AMRMClient { return racks; } - + /** * ContainerRequests with locality relaxation cannot be made at the same * priority as ContainerRequests without locality relaxation. */ private void checkLocalityRelaxationConflict(Priority priority, Collection locations, boolean relaxLocality) { - Map> remoteRequests = - this.remoteRequestsTable.get(priority); - if (remoteRequests == null) { - return; - } // Locality relaxation will be set to relaxLocality for all implicitly // requested racks. Make sure that existing rack requests match this. - for (String location : locations) { - TreeMap reqs = - remoteRequests.get(location); - if (reqs != null && !reqs.isEmpty()) { - boolean existingRelaxLocality = - reqs.values().iterator().next().remoteRequest.getRelaxLocality(); - if (relaxLocality != existingRelaxLocality) { - throw new InvalidContainerRequestException("Cannot submit a " - + "ContainerRequest asking for location " + location - + " with locality relaxation " + relaxLocality + " when it has " - + "already been requested with locality relaxation " + existingRelaxLocality); - } - } + + @SuppressWarnings("unchecked") + List allCapabilityMaps = + remoteRequestsTable.getAllResourceRequestInfos(priority, locations); + for (ResourceRequestInfo reqs : allCapabilityMaps) { + ResourceRequest remoteRequest = reqs.remoteRequest; + boolean existingRelaxLocality = remoteRequest.getRelaxLocality(); + if (relaxLocality != existingRelaxLocality) { + throw new InvalidContainerRequestException("Cannot submit a " + + "ContainerRequest asking for location " + + remoteRequest.getResourceName() + " with locality relaxation " + + relaxLocality + " when it has already been requested" + + "with locality relaxation " + existingRelaxLocality); } + } } /** @@ -747,46 +739,13 @@ public class AMRMClientImpl extends AMRMClient { ask.add(remoteRequest); } - private void - addResourceRequest(Priority priority, String resourceName, - Resource capability, T req, boolean relaxLocality, - String labelExpression) { - Map> remoteRequests = - this.remoteRequestsTable.get(priority); - if (remoteRequests == null) { - remoteRequests = - new HashMap>(); - this.remoteRequestsTable.put(priority, remoteRequests); - if (LOG.isDebugEnabled()) { - LOG.debug("Added priority=" + priority); - } - } - TreeMap reqMap = - remoteRequests.get(resourceName); - if (reqMap == null) { - // capabilities are stored in reverse sorted order. smallest last. - reqMap = new TreeMap( - new ResourceReverseMemoryThenCpuComparator()); - remoteRequests.put(resourceName, reqMap); - } - ResourceRequestInfo resourceRequestInfo = reqMap.get(capability); - if (resourceRequestInfo == null) { - resourceRequestInfo = - new ResourceRequestInfo(priority, resourceName, capability, - relaxLocality); - reqMap.put(capability, resourceRequestInfo); - } - - resourceRequestInfo.remoteRequest.setNumContainers( - resourceRequestInfo.remoteRequest.getNumContainers() + 1); - - if (relaxLocality) { - resourceRequestInfo.containerRequests.add(req); - } - - if (ResourceRequest.ANY.equals(resourceName)) { - resourceRequestInfo.remoteRequest.setNodeLabelExpression(labelExpression); - } + private void addResourceRequest(Priority priority, String resourceName, + ExecutionTypeRequest execTypeReq, Resource capability, T req, + boolean relaxLocality, String labelExpression) { + @SuppressWarnings("unchecked") + ResourceRequestInfo resourceRequestInfo = remoteRequestsTable + .addResourceRequest(priority, resourceName, + execTypeReq, capability, req, relaxLocality, labelExpression); // Note this down for next interaction with ResourceManager addResourceRequestToAsk(resourceRequestInfo.remoteRequest); @@ -800,70 +759,31 @@ public class AMRMClientImpl extends AMRMClient { } } - private void decResourceRequest(Priority priority, - String resourceName, - Resource capability, - T req) { - Map> remoteRequests = - this.remoteRequestsTable.get(priority); - - if(remoteRequests == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Not decrementing resource as priority " + priority - + " is not present in request table"); - } - return; - } - - Map reqMap = remoteRequests.get(resourceName); - if (reqMap == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Not decrementing resource as " + resourceName - + " is not present in request table"); - } - return; - } - ResourceRequestInfo resourceRequestInfo = reqMap.get(capability); - - if (LOG.isDebugEnabled()) { - LOG.debug("BEFORE decResourceRequest:" + " applicationId=" - + " priority=" + priority.getPriority() - + " resourceName=" + resourceName + " numContainers=" - + resourceRequestInfo.remoteRequest.getNumContainers() - + " #asks=" + ask.size()); - } - - resourceRequestInfo.remoteRequest.setNumContainers( - resourceRequestInfo.remoteRequest.getNumContainers() - 1); - - resourceRequestInfo.containerRequests.remove(req); - - if(resourceRequestInfo.remoteRequest.getNumContainers() < 0) { - // guard against spurious removals - resourceRequestInfo.remoteRequest.setNumContainers(0); - } + private void decResourceRequest(Priority priority, String resourceName, + ExecutionTypeRequest execTypeReq, Resource capability, T req) { + @SuppressWarnings("unchecked") + ResourceRequestInfo resourceRequestInfo = + remoteRequestsTable.decResourceRequest(priority, resourceName, + execTypeReq, capability, req); // send the ResourceRequest to RM even if is 0 because it needs to override // a previously sent value. If ResourceRequest was not sent previously then // sending 0 aught to be a no-op on RM - addResourceRequestToAsk(resourceRequestInfo.remoteRequest); + if (resourceRequestInfo != null) { + addResourceRequestToAsk(resourceRequestInfo.remoteRequest); - // delete entries from map if no longer needed - if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) { - reqMap.remove(capability); - if (reqMap.size() == 0) { - remoteRequests.remove(resourceName); + // delete entry from map if no longer needed + if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) { + this.remoteRequestsTable.remove(priority, resourceName, + execTypeReq.getExecutionType(), capability); } - if (remoteRequests.size() == 0) { - remoteRequestsTable.remove(priority); - } - } - if (LOG.isDebugEnabled()) { - LOG.debug("AFTER decResourceRequest:" + " applicationId=" - + " priority=" + priority.getPriority() - + " resourceName=" + resourceName + " numContainers=" - + resourceRequestInfo.remoteRequest.getNumContainers() - + " #asks=" + ask.size()); + if (LOG.isDebugEnabled()) { + LOG.debug("AFTER decResourceRequest:" + " applicationId=" + + " priority=" + priority.getPriority() + + " resourceName=" + resourceName + " numContainers=" + + resourceRequestInfo.remoteRequest.getNumContainers() + + " #asks=" + ask.size()); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java new file mode 100644 index 0000000..853a512 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java @@ -0,0 +1,332 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ResourceRequestInfo; +import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ResourceReverseMemoryThenCpuComparator; + +class RemoteRequestsTable implements Iterable{ + + private static final Log LOG = LogFactory.getLog(RemoteRequestsTable.class); + + static ResourceReverseMemoryThenCpuComparator resourceComparator = + new ResourceReverseMemoryThenCpuComparator(); + + /** + * Nested Iterator that iterates over just the ResourceRequestInfo + * object. + */ + class RequestInfoIterator implements Iterator { + private Iterator>>> iLocMap; + private Iterator>> iExecTypeMap; + private Iterator> iCapMap; + private Iterator iResReqInfo; + + public RequestInfoIterator(Iterator>>> + iLocationMap) { + this.iLocMap = iLocationMap; + if (iLocMap.hasNext()) { + iExecTypeMap = iLocMap.next().values().iterator(); + } else { + iExecTypeMap = + new LinkedList>>().iterator(); + } + if (iExecTypeMap.hasNext()) { + iCapMap = iExecTypeMap.next().values().iterator(); + } else { + iCapMap = + new LinkedList>() + .iterator(); + } + if (iCapMap.hasNext()) { + iResReqInfo = iCapMap.next().values().iterator(); + } else { + iResReqInfo = new LinkedList().iterator(); + } + } + + @Override + public boolean hasNext() { + return iLocMap.hasNext() + || iExecTypeMap.hasNext() + || iCapMap.hasNext() + || iResReqInfo.hasNext(); + } + + @Override + public ResourceRequestInfo next() { + if (!iResReqInfo.hasNext()) { + if (!iCapMap.hasNext()) { + if (!iExecTypeMap.hasNext()) { + iExecTypeMap = iLocMap.next().values().iterator(); + } + iCapMap = iExecTypeMap.next().values().iterator(); + } + iResReqInfo = iCapMap.next().values().iterator(); + } + return iResReqInfo.next(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove is not supported" + + "for this iterator !!"); + } + } + + // Nest map with Primary key : + // Priority -> ResourceName(String) -> ExecutionType -> Capability(Resource) + // and value : ResourceRequestInfo + private Map>>> remoteRequestsTable = new HashMap<>(); + + @Override + public Iterator iterator() { + return new RequestInfoIterator(remoteRequestsTable.values().iterator()); + } + + ResourceRequestInfo get(Priority priority, String location, + ExecutionType execType, Resource capability) { + TreeMap capabilityMap = + getCapabilityMap(priority, location, execType); + if (capabilityMap == null) { + return null; + } + return capabilityMap.get(capability); + } + + void put(Priority priority, String resourceName, ExecutionType execType, + Resource capability, ResourceRequestInfo resReqInfo) { + Map>> locationMap = + remoteRequestsTable.get(priority); + if (locationMap == null) { + locationMap = new HashMap<>(); + this.remoteRequestsTable.put(priority, locationMap); + if (LOG.isDebugEnabled()) { + LOG.debug("Added priority=" + priority); + } + } + Map> execTypeMap = + locationMap.get(resourceName); + if (execTypeMap == null) { + execTypeMap = new HashMap<>(); + locationMap.put(resourceName, execTypeMap); + if (LOG.isDebugEnabled()) { + LOG.debug("Added resourceName=" + resourceName); + } + } + TreeMap capabilityMap = + execTypeMap.get(execType); + if (capabilityMap == null) { + capabilityMap = new TreeMap<>(resourceComparator); + execTypeMap.put(execType, capabilityMap); + if (LOG.isDebugEnabled()) { + LOG.debug("Added Execution Type=" + execType); + } + } + capabilityMap.put(capability, resReqInfo); + } + + ResourceRequestInfo remove(Priority priority, String resourceName, + ExecutionType execType, Resource capability) { + ResourceRequestInfo retVal = null; + Map>> locationMap = remoteRequestsTable.get(priority); + if (locationMap == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("No such priority=" + priority); + } + return null; + } + Map> + execTypeMap = locationMap.get(resourceName); + if (execTypeMap == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("No such resourceName=" + resourceName); + } + return null; + } + TreeMap capabilityMap = + execTypeMap.get(execType); + if (capabilityMap == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("No such Execution Type=" + execType); + } + return null; + } + retVal = capabilityMap.remove(capability); + if (capabilityMap.size() == 0) { + execTypeMap.remove(execType); + if (execTypeMap.size() == 0) { + locationMap.remove(resourceName); + if (locationMap.size() == 0) { + this.remoteRequestsTable.remove(priority); + } + } + } + return retVal; + } + + Map>> getLocationMap(Priority priority) { + return remoteRequestsTable.get(priority); + } + + Map> + getExecutionTypeMap(Priority priority, String location) { + Map>> locationMap = getLocationMap(priority); + if (locationMap == null) { + return null; + } + return locationMap.get(location); + } + + TreeMap getCapabilityMap(Priority + priority, String location, + ExecutionType execType) { + Map> + executionTypeMap = getExecutionTypeMap(priority, location); + if (executionTypeMap == null) { + return null; + } + return executionTypeMap.get(execType); + } + + @SuppressWarnings("unchecked") + List getAllResourceRequestInfos(Priority priority, + Collection locations) { + List retList = new LinkedList<>(); + for (String location : locations) { + for (ExecutionType eType : ExecutionType.values()) { + TreeMap capabilityMap = + getCapabilityMap(priority, location, eType); + if (capabilityMap != null) { + retList.addAll(capabilityMap.values()); + } + } + } + return retList; + } + + List getMatchingRequests( + Priority priority, String resourceName, ExecutionType executionType, + Resource capability) { + List list = new LinkedList<>(); + TreeMap capabilityMap = + getCapabilityMap(priority, resourceName, executionType); + if (capabilityMap != null) { + ResourceRequestInfo resourceRequestInfo = capabilityMap.get(capability); + if (resourceRequestInfo != null) { + list.add(resourceRequestInfo); + } else { + list.addAll(capabilityMap.tailMap(capability).values()); + } + } + return list; + } + + @SuppressWarnings("unchecked") + ResourceRequestInfo addResourceRequest(Priority priority, String resourceName, + ExecutionTypeRequest execTypeReq, Resource capability, T req, + boolean relaxLocality, String labelExpression) { + ResourceRequestInfo resourceRequestInfo = get(priority, resourceName, + execTypeReq.getExecutionType(), capability); + if (resourceRequestInfo == null) { + resourceRequestInfo = + new ResourceRequestInfo(priority, resourceName, capability, + relaxLocality); + put(priority, resourceName, execTypeReq.getExecutionType(), capability, + resourceRequestInfo); + } + resourceRequestInfo.remoteRequest.setExecutionTypeRequest(execTypeReq); + resourceRequestInfo.remoteRequest.setNumContainers( + resourceRequestInfo.remoteRequest.getNumContainers() + 1); + + if (relaxLocality) { + resourceRequestInfo.containerRequests.add(req); + } + + if (ResourceRequest.ANY.equals(resourceName)) { + resourceRequestInfo.remoteRequest.setNodeLabelExpression(labelExpression); + } + return resourceRequestInfo; + } + + ResourceRequestInfo decResourceRequest(Priority priority, String resourceName, + ExecutionTypeRequest execTypeReq, Resource capability, T req) { + ResourceRequestInfo resourceRequestInfo = get(priority, resourceName, + execTypeReq.getExecutionType(), capability); + + if (resourceRequestInfo == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Not decrementing resource as ResourceRequestInfo with" + + "priority=" + priority + ", " + + "resourceName=" + resourceName + ", " + + "executionType=" + execTypeReq + ", " + + "capability=" + capability + " is not present in request table"); + } + return null; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("BEFORE decResourceRequest:" + " applicationId=" + + " priority=" + priority.getPriority() + + " resourceName=" + resourceName + " numContainers=" + + resourceRequestInfo.remoteRequest.getNumContainers()); + } + + resourceRequestInfo.remoteRequest.setNumContainers( + resourceRequestInfo.remoteRequest.getNumContainers() - 1); + + resourceRequestInfo.containerRequests.remove(req); + + if (resourceRequestInfo.remoteRequest.getNumContainers() < 0) { + // guard against spurious removals + resourceRequestInfo.remoteRequest.setNumContainers(0); + } + return resourceRequestInfo; + } + + boolean isEmpty() { + return remoteRequestsTable.isEmpty(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMProxyE2ETest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMProxyE2ETest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMProxyE2ETest.java new file mode 100644 index 0000000..0b62054 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMProxyE2ETest.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.Records; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +/** + * Base test case to be used for Testing frameworks that use AMRMProxy. + */ +public abstract class BaseAMRMProxyE2ETest { + + protected ApplicationMasterProtocol createAMRMProtocol(YarnClient rmClient, + ApplicationId appId, MiniYARNCluster cluster, + final Configuration yarnConf) + throws IOException, InterruptedException, YarnException { + + UserGroupInformation user = null; + + // Get the AMRMToken from AMRMProxy + + ApplicationReport report = rmClient.getApplicationReport(appId); + + user = UserGroupInformation.createProxyUser( + report.getCurrentApplicationAttemptId().toString(), + UserGroupInformation.getCurrentUser()); + + ContainerManagerImpl containerManager = (ContainerManagerImpl) cluster + .getNodeManager(0).getNMContext().getContainerManager(); + + AMRMProxyTokenSecretManager amrmTokenSecretManager = + containerManager.getAMRMProxyService().getSecretManager(); + org.apache.hadoop.security.token.Token token = + amrmTokenSecretManager + .createAndGetAMRMToken(report.getCurrentApplicationAttemptId()); + + SecurityUtil.setTokenService(token, + containerManager.getAMRMProxyService().getBindAddress()); + user.addToken(token); + + // Start Application Master + + return user + .doAs(new PrivilegedExceptionAction() { + @Override + public ApplicationMasterProtocol run() throws Exception { + return ClientRMProxy.createRMProxy(yarnConf, + ApplicationMasterProtocol.class); + } + }); + } + + protected AllocateRequest createAllocateRequest(List listNode) { + // The test needs AMRMClient to create a real allocate request + AMRMClientImpl amClient = + new AMRMClientImpl<>(); + + Resource capability = Resource.newInstance(1024, 2); + Priority priority = Priority.newInstance(1); + List nodeReports = listNode; + String node = nodeReports.get(0).getNodeId().getHost(); + String[] nodes = new String[] {node}; + + AMRMClient.ContainerRequest storedContainer1 = + new AMRMClient.ContainerRequest(capability, nodes, null, priority); + amClient.addContainerRequest(storedContainer1); + amClient.addContainerRequest(storedContainer1); + + List resourceAsk = new ArrayList<>(); + for (ResourceRequest rr : amClient.ask) { + resourceAsk.add(rr); + } + + ResourceBlacklistRequest resourceBlacklistRequest = ResourceBlacklistRequest + .newInstance(new ArrayList<>(), new ArrayList<>()); + + int responseId = 1; + + return AllocateRequest.newInstance(responseId, 0, resourceAsk, + new ArrayList<>(), resourceBlacklistRequest); + } + + protected ApplicationAttemptId createApp(YarnClient yarnClient, + MiniYARNCluster yarnCluster, Configuration conf) throws Exception { + + ApplicationSubmissionContext appContext = + yarnClient.createApplication().getApplicationSubmissionContext(); + ApplicationId appId = appContext.getApplicationId(); + + appContext.setApplicationName("Test"); + + Priority pri = Records.newRecord(Priority.class); + pri.setPriority(0); + appContext.setPriority(pri); + + appContext.setQueue("default"); + + ContainerLaunchContext amContainer = BuilderUtils.newContainerLaunchContext( + Collections. emptyMap(), + new HashMap(), Arrays.asList("sleep", "10000"), + new HashMap(), null, + new HashMap()); + appContext.setAMContainerSpec(amContainer); + appContext.setResource(Resource.newInstance(1024, 1)); + + SubmitApplicationRequest appRequest = + Records.newRecord(SubmitApplicationRequest.class); + appRequest.setApplicationSubmissionContext(appContext); + + yarnClient.submitApplication(appContext); + + RMAppAttempt appAttempt = null; + ApplicationAttemptId attemptId = null; + while (true) { + ApplicationReport appReport = yarnClient.getApplicationReport(appId); + if (appReport + .getYarnApplicationState() == YarnApplicationState.ACCEPTED) { + attemptId = + appReport.getCurrentApplicationAttemptId(); + appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps() + .get(attemptId.getApplicationId()).getCurrentAppAttempt(); + while (true) { + if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) { + break; + } + } + break; + } + } + Thread.sleep(1000); + // Just dig into the ResourceManager and get the AMRMToken just for the sake + // of testing. + UserGroupInformation.setLoginUser(UserGroupInformation + .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName())); + + // emulate RM setup of AMRM token in credentials by adding the token + // *before* setting the token service + UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken()); + appAttempt.getAMRMToken().setService( + ClientRMProxy.getAMRMTokenService(conf)); + return attemptId; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 75b49d0..99bfca5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.NMToken; @@ -413,11 +414,13 @@ public class TestAMRMClient { amClient.addContainerRequest(storedContainer3); // test addition and storage - int containersRequestedAny = amClient.remoteRequestsTable.get(priority) - .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers(); + int containersRequestedAny = amClient.remoteRequestsTable.get(priority, + ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) + .remoteRequest.getNumContainers(); assertEquals(2, containersRequestedAny); - containersRequestedAny = amClient.remoteRequestsTable.get(priority1) - .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers(); + containersRequestedAny = amClient.remoteRequestsTable.get(priority1, + ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) + .remoteRequest.getNumContainers(); assertEquals(1, containersRequestedAny); List> matches = amClient.getMatchingRequests(priority, node, capability); @@ -919,12 +922,15 @@ public class TestAMRMClient { amClient.removeContainerRequest( new ContainerRequest(capability, nodes, racks, priority)); - int containersRequestedNode = amClient.remoteRequestsTable.get(priority) - .get(node).get(capability).remoteRequest.getNumContainers(); - int containersRequestedRack = amClient.remoteRequestsTable.get(priority) - .get(rack).get(capability).remoteRequest.getNumContainers(); - int containersRequestedAny = amClient.remoteRequestsTable.get(priority) - .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers(); + int containersRequestedNode = amClient.remoteRequestsTable.get(priority, + node, ExecutionType.GUARANTEED, capability).remoteRequest + .getNumContainers(); + int containersRequestedRack = amClient.remoteRequestsTable.get(priority, + rack, ExecutionType.GUARANTEED, capability).remoteRequest + .getNumContainers(); + int containersRequestedAny = amClient.remoteRequestsTable.get(priority, + ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) + .remoteRequest.getNumContainers(); assertEquals(2, containersRequestedNode); assertEquals(2, containersRequestedRack); http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java index cb8c86a..2db33c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java @@ -26,6 +26,8 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.net.DNSToSwitchMapping; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -35,6 +37,46 @@ import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException; import org.junit.Test; public class TestAMRMClientContainerRequest { + + @Test + public void testOpportunisticAndGuaranteedRequests() { + AMRMClientImpl client = + new AMRMClientImpl(); + + Configuration conf = new Configuration(); + conf.setClass( + CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + MyResolver.class, DNSToSwitchMapping.class); + client.init(conf); + + Resource capability = Resource.newInstance(1024, 1); + ContainerRequest request = + new ContainerRequest(capability, new String[] {"host1", "host2"}, + new String[] {"/rack2"}, Priority.newInstance(1)); + client.addContainerRequest(request); + verifyResourceRequest(client, request, "host1", true); + verifyResourceRequest(client, request, "host2", true); + verifyResourceRequest(client, request, "/rack1", true); + verifyResourceRequest(client, request, "/rack2", true); + verifyResourceRequest(client, request, ResourceRequest.ANY, true); + ContainerRequest request2 = + new ContainerRequest(capability, new String[] {"host1", "host2"}, + new String[] {"/rack2"}, Priority.newInstance(1), true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true)); + client.addContainerRequest(request2); + verifyResourceRequest(client, request, "host1", true, + ExecutionType.OPPORTUNISTIC); + verifyResourceRequest(client, request, "host2", true, + ExecutionType.OPPORTUNISTIC); + verifyResourceRequest(client, request, "/rack1", true, + ExecutionType.OPPORTUNISTIC); + verifyResourceRequest(client, request, "/rack2", true, + ExecutionType.OPPORTUNISTIC); + verifyResourceRequest(client, request, ResourceRequest.ANY, true, + ExecutionType.OPPORTUNISTIC); + } + @Test public void testFillInRacks() { AMRMClientImpl client = @@ -224,8 +266,16 @@ public class TestAMRMClientContainerRequest { private void verifyResourceRequest( AMRMClientImpl client, ContainerRequest request, String location, boolean expectedRelaxLocality) { - ResourceRequest ask = client.remoteRequestsTable.get(request.getPriority()) - .get(location).get(request.getCapability()).remoteRequest; + verifyResourceRequest(client, request, location, expectedRelaxLocality, + ExecutionType.GUARANTEED); + } + + private void verifyResourceRequest( + AMRMClientImpl client, ContainerRequest request, + String location, boolean expectedRelaxLocality, + ExecutionType executionType) { + ResourceRequest ask = client.remoteRequestsTable.get(request.getPriority(), + location, executionType, request.getCapability()).remoteRequest; assertEquals(location, ask.getResourceName()); assertEquals(1, ask.getNumContainers()); assertEquals(expectedRelaxLocality, ask.getRelaxLocality()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java index f1e3f03..33f7527 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java @@ -19,20 +19,12 @@ package org.apache.hadoop.yarn.client.api.impl; import java.io.IOException; -import java.nio.ByteBuffer; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -40,43 +32,25 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; -import org.apache.hadoop.yarn.api.records.ApplicationAccessType; + import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.client.ClientRMProxy; -import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.MiniYARNCluster; -import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; -public class TestAMRMProxy { +/** + * End-to-End test cases for the AMRMProxy Service. + */ +public class TestAMRMProxy extends BaseAMRMProxyE2ETest { private static final Log LOG = LogFactory.getLog(TestAMRMProxy.class); @@ -84,7 +58,7 @@ public class TestAMRMProxy { * This test validates register, allocate and finish of an application through * the AMRMPRoxy. */ - @Test(timeout = 60000) + @Test(timeout = 120000) public void testAMRMProxyE2E() throws Exception { MiniYARNCluster cluster = new MiniYARNCluster("testAMRMProxyE2E", 1, 1, 1); YarnClient rmClient = null; @@ -107,7 +81,8 @@ public class TestAMRMProxy { // Submit application - ApplicationId appId = createApp(rmClient, cluster); + ApplicationAttemptId appAttmptId = createApp(rmClient, cluster, conf); + ApplicationId appId = appAttmptId.getApplicationId(); client = createAMRMProtocol(rmClient, appId, cluster, yarnConf); @@ -173,7 +148,7 @@ public class TestAMRMProxy { * that the received token it is different from the previous one within 5 * requests. */ - @Test(timeout = 60000) + @Test(timeout = 120000) public void testE2ETokenRenewal() throws Exception { MiniYARNCluster cluster = new MiniYARNCluster("testE2ETokenRenewal", 1, 1, 1); @@ -201,7 +176,8 @@ public class TestAMRMProxy { // Submit - ApplicationId appId = createApp(rmClient, cluster); + ApplicationAttemptId appAttmptId = createApp(rmClient, cluster, conf); + ApplicationId appId = appAttmptId.getApplicationId(); client = createAMRMProtocol(rmClient, appId, cluster, yarnConf); @@ -252,7 +228,7 @@ public class TestAMRMProxy { * This test validates that an AM cannot register directly to the RM, with the * token provided by the AMRMProxy. */ - @Test(timeout = 60000) + @Test(timeout = 120000) public void testE2ETokenSwap() throws Exception { MiniYARNCluster cluster = new MiniYARNCluster("testE2ETokenSwap", 1, 1, 1); YarnClient rmClient = null; @@ -270,7 +246,8 @@ public class TestAMRMProxy { rmClient.init(yarnConf); rmClient.start(); - ApplicationId appId = createApp(rmClient, cluster); + ApplicationAttemptId appAttmptId = createApp(rmClient, cluster, conf); + ApplicationId appId = appAttmptId.getApplicationId(); client = createAMRMProtocol(rmClient, appId, cluster, yarnConf); @@ -290,124 +267,4 @@ public class TestAMRMProxy { cluster.stop(); } } - - protected ApplicationMasterProtocol createAMRMProtocol(YarnClient rmClient, - ApplicationId appId, MiniYARNCluster cluster, - final Configuration yarnConf) - throws IOException, InterruptedException, YarnException { - - UserGroupInformation user = null; - - // Get the AMRMToken from AMRMProxy - - ApplicationReport report = rmClient.getApplicationReport(appId); - - user = UserGroupInformation.createProxyUser( - report.getCurrentApplicationAttemptId().toString(), - UserGroupInformation.getCurrentUser()); - - ContainerManagerImpl containerManager = (ContainerManagerImpl) cluster - .getNodeManager(0).getNMContext().getContainerManager(); - - AMRMProxyTokenSecretManager amrmTokenSecretManager = - containerManager.getAMRMProxyService().getSecretManager(); - org.apache.hadoop.security.token.Token token = - amrmTokenSecretManager - .createAndGetAMRMToken(report.getCurrentApplicationAttemptId()); - - SecurityUtil.setTokenService(token, - containerManager.getAMRMProxyService().getBindAddress()); - user.addToken(token); - - // Start Application Master - - return user - .doAs(new PrivilegedExceptionAction() { - @Override - public ApplicationMasterProtocol run() throws Exception { - return ClientRMProxy.createRMProxy(yarnConf, - ApplicationMasterProtocol.class); - } - }); - } - - protected AllocateRequest createAllocateRequest(List listNode) { - // The test needs AMRMClient to create a real allocate request - AMRMClientImpl amClient = - new AMRMClientImpl(); - - Resource capability = Resource.newInstance(1024, 2); - Priority priority = Priority.newInstance(1); - List nodeReports = listNode; - String node = nodeReports.get(0).getNodeId().getHost(); - String[] nodes = new String[] { node }; - - ContainerRequest storedContainer1 = - new ContainerRequest(capability, nodes, null, priority); - amClient.addContainerRequest(storedContainer1); - amClient.addContainerRequest(storedContainer1); - - List resourceAsk = new ArrayList(); - for (ResourceRequest rr : amClient.ask) { - resourceAsk.add(rr); - } - - ResourceBlacklistRequest resourceBlacklistRequest = ResourceBlacklistRequest - .newInstance(new ArrayList(), new ArrayList()); - - int responseId = 1; - - return AllocateRequest.newInstance(responseId, 0, resourceAsk, - new ArrayList(), resourceBlacklistRequest); - } - - protected ApplicationId createApp(YarnClient yarnClient, - MiniYARNCluster yarnCluster) throws Exception { - - ApplicationSubmissionContext appContext = - yarnClient.createApplication().getApplicationSubmissionContext(); - ApplicationId appId = appContext.getApplicationId(); - - appContext.setApplicationName("Test"); - - Priority pri = Records.newRecord(Priority.class); - pri.setPriority(0); - appContext.setPriority(pri); - - appContext.setQueue("default"); - - ContainerLaunchContext amContainer = BuilderUtils.newContainerLaunchContext( - Collections. emptyMap(), - new HashMap(), Arrays.asList("sleep", "10000"), - new HashMap(), null, - new HashMap()); - appContext.setAMContainerSpec(amContainer); - appContext.setResource(Resource.newInstance(1024, 1)); - - SubmitApplicationRequest appRequest = - Records.newRecord(SubmitApplicationRequest.class); - appRequest.setApplicationSubmissionContext(appContext); - - yarnClient.submitApplication(appContext); - - RMAppAttempt appAttempt = null; - while (true) { - ApplicationReport appReport = yarnClient.getApplicationReport(appId); - if (appReport - .getYarnApplicationState() == YarnApplicationState.ACCEPTED) { - ApplicationAttemptId attemptId = - appReport.getCurrentApplicationAttemptId(); - appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps() - .get(attemptId.getApplicationId()).getCurrentAppAttempt(); - while (true) { - if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) { - break; - } - } - break; - } - } - Thread.sleep(1000); - return appId; - } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org