From common-commits-return-77671-archive-asf-public=cust-asf.ponee.io@hadoop.apache.org Thu Jan 18 23:11:21 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id BD3C0180654 for ; Thu, 18 Jan 2018 23:11:21 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id AC5A8160C26; Thu, 18 Jan 2018 22:11:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5BCEF160C48 for ; Thu, 18 Jan 2018 23:11:19 +0100 (CET) Received: (qmail 12760 invoked by uid 500); 18 Jan 2018 22:11:16 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 12713 invoked by uid 99); 18 Jan 2018 22:11:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Jan 2018 22:11:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2475DE7E01; Thu, 18 Jan 2018 22:11:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: asuresh@apache.org To: common-commits@hadoop.apache.org Date: Thu, 18 Jan 2018 22:11:16 -0000 Message-Id: <86fb53e1c00d47fe949a41cc82826a42@git.apache.org> In-Reply-To: <9e97d8d45e7a4a0091d6cf310110f6b3@git.apache.org> References: <9e97d8d45e7a4a0091d6cf310110f6b3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] hadoop git commit: YARN-6599. Support anti-affinity constraint via AppPlacementAllocator. (Wangda Tan via asuresh) http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java index 73b4f9e..24c5a5e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java @@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; import java.util.Iterator; import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -30,9 +32,12 @@ import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType; import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SingleConstraintTransformer; import org.apache.hadoop.yarn.api.resource.PlacementConstraints; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE_PARTITION; + /** * This class contains various static methods used by the Placement Algorithms * to simplify constrained placement. @@ -41,16 +46,20 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algori @Public @Unstable public final class PlacementConstraintsUtil { + private static final Log LOG = + LogFactory.getLog(PlacementConstraintsUtil.class); // Suppresses default constructor, ensuring non-instantiability. private PlacementConstraintsUtil() { } /** - * Returns true if **single** application constraint with associated + * Returns true if **single** placement constraint with associated * allocationTags and scope is satisfied by a specific scheduler Node. * - * @param appId the application id + * @param targetApplicationId the application id, which could be override by + * target application id specified inside allocation + * tags. * @param sc the placement constraint * @param te the target expression * @param node the scheduler node @@ -59,32 +68,123 @@ public final class PlacementConstraintsUtil { * @throws InvalidAllocationTagsQueryException */ private static boolean canSatisfySingleConstraintExpression( - ApplicationId appId, SingleConstraint sc, TargetExpression te, - SchedulerNode node, AllocationTagsManager tm) + ApplicationId targetApplicationId, SingleConstraint sc, + TargetExpression te, SchedulerNode node, AllocationTagsManager tm) throws InvalidAllocationTagsQueryException { long minScopeCardinality = 0; long maxScopeCardinality = 0; + + // Optimizations to only check cardinality if necessary. + int desiredMinCardinality = sc.getMinCardinality(); + int desiredMaxCardinality = sc.getMaxCardinality(); + boolean checkMinCardinality = desiredMinCardinality > 0; + boolean checkMaxCardinality = desiredMaxCardinality < Integer.MAX_VALUE; + if (sc.getScope().equals(PlacementConstraints.NODE)) { - minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId, - te.getTargetValues(), Long::max); - maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId, - te.getTargetValues(), Long::min); + if (checkMinCardinality) { + minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), + targetApplicationId, te.getTargetValues(), Long::max); + } + if (checkMaxCardinality) { + maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), + targetApplicationId, te.getTargetValues(), Long::min); + } } else if (sc.getScope().equals(PlacementConstraints.RACK)) { - minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId, - te.getTargetValues(), Long::max); - maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId, - te.getTargetValues(), Long::min); + if (checkMinCardinality) { + minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), + targetApplicationId, te.getTargetValues(), Long::max); + } + if (checkMaxCardinality) { + maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), + targetApplicationId, te.getTargetValues(), Long::min); + } } // Make sure Anti-affinity satisfies hard upper limit - maxScopeCardinality = sc.getMaxCardinality() == 0 ? maxScopeCardinality - 1 + maxScopeCardinality = desiredMaxCardinality == 0 ? maxScopeCardinality - 1 : maxScopeCardinality; - return (minScopeCardinality >= sc.getMinCardinality() - && maxScopeCardinality < sc.getMaxCardinality()); + return (desiredMinCardinality <= 0 + || minScopeCardinality >= desiredMinCardinality) && ( + desiredMaxCardinality == Integer.MAX_VALUE + || maxScopeCardinality < desiredMaxCardinality); + } + + private static boolean canSatisfyNodePartitionConstraintExpresssion( + TargetExpression targetExpression, SchedulerNode schedulerNode) { + Set values = targetExpression.getTargetValues(); + if (values == null || values.isEmpty()) { + return schedulerNode.getPartition().equals( + RMNodeLabelsManager.NO_LABEL); + } else{ + String nodePartition = values.iterator().next(); + if (!nodePartition.equals(schedulerNode.getPartition())) { + return false; + } + } + + return true; + } + + private static boolean canSatisfySingleConstraint(ApplicationId applicationId, + SingleConstraint singleConstraint, SchedulerNode schedulerNode, + AllocationTagsManager tagsManager) + throws InvalidAllocationTagsQueryException { + // Iterate through TargetExpressions + Iterator expIt = + singleConstraint.getTargetExpressions().iterator(); + while (expIt.hasNext()) { + TargetExpression currentExp = expIt.next(); + // Supporting AllocationTag Expressions for now + if (currentExp.getTargetType().equals(TargetType.ALLOCATION_TAG)) { + // Check if conditions are met + if (!canSatisfySingleConstraintExpression(applicationId, + singleConstraint, currentExp, schedulerNode, tagsManager)) { + return false; + } + } else if (currentExp.getTargetType().equals(TargetType.NODE_ATTRIBUTE) + && currentExp.getTargetKey().equals(NODE_PARTITION)) { + // This is a node partition expression, check it. + canSatisfyNodePartitionConstraintExpresssion(currentExp, schedulerNode); + } + } + // return true if all targetExpressions are satisfied + return true; + } + + /** + * Returns true if all placement constraints are **currently** satisfied by a + * specific scheduler Node.. + * + * To do so the method retrieves and goes through all application constraint + * expressions and checks if the specific allocation is between the allowed + * min-max cardinality values under the constraint scope (Node/Rack/etc). + * + * @param applicationId applicationId, + * @param placementConstraint placement constraint. + * @param node the scheduler node + * @param tagsManager the allocation tags store + * @return true if all application constraints are satisfied by node + * @throws InvalidAllocationTagsQueryException + */ + public static boolean canSatisfySingleConstraint(ApplicationId applicationId, + PlacementConstraint placementConstraint, SchedulerNode node, + AllocationTagsManager tagsManager) + throws InvalidAllocationTagsQueryException { + if (placementConstraint == null) { + return true; + } + // Transform to SimpleConstraint + SingleConstraintTransformer singleTransformer = + new SingleConstraintTransformer(placementConstraint); + placementConstraint = singleTransformer.transform(); + AbstractConstraint sConstraintExpr = placementConstraint.getConstraintExpr(); + SingleConstraint single = (SingleConstraint) sConstraintExpr; + + return canSatisfySingleConstraint(applicationId, single, node, tagsManager); } /** - * Returns true if all application constraints with associated allocationTags + * Returns true if all placement constraints with associated allocationTags * are **currently** satisfied by a specific scheduler Node. * To do so the method retrieves and goes through all application constraint * expressions and checks if the specific allocation is between the allowed @@ -98,41 +198,12 @@ public final class PlacementConstraintsUtil { * @return true if all application constraints are satisfied by node * @throws InvalidAllocationTagsQueryException */ - public static boolean canSatisfyConstraints(ApplicationId appId, + public static boolean canSatisfySingleConstraint(ApplicationId appId, Set allocationTags, SchedulerNode node, PlacementConstraintManager pcm, AllocationTagsManager tagsManager) throws InvalidAllocationTagsQueryException { PlacementConstraint constraint = pcm.getConstraint(appId, allocationTags); - if (constraint == null) { - return true; - } - // Transform to SimpleConstraint - SingleConstraintTransformer singleTransformer = - new SingleConstraintTransformer(constraint); - constraint = singleTransformer.transform(); - AbstractConstraint sConstraintExpr = constraint.getConstraintExpr(); - SingleConstraint single = (SingleConstraint) sConstraintExpr; - // Iterate through TargetExpressions - Iterator expIt = single.getTargetExpressions().iterator(); - while (expIt.hasNext()) { - TargetExpression currentExp = expIt.next(); - // Supporting AllocationTag Expressions for now - if (currentExp.getTargetType().equals(TargetType.ALLOCATION_TAG)) { - // If source and tag allocation tags are the same, we do not enforce - // constraints with minimum cardinality. - if (currentExp.getTargetValues().equals(allocationTags) - && single.getMinCardinality() > 0) { - return true; - } - // Check if conditions are met - if (!canSatisfySingleConstraintExpression(appId, single, currentExp, - node, tagsManager)) { - return false; - } - } - } - // return true if all targetExpressions are satisfied - return true; + return canSatisfySingleConstraint(appId, constraint, node, tagsManager); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java index 9ed9ab1..eb3fe88 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java @@ -67,7 +67,7 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm { throws InvalidAllocationTagsQueryException { int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations(); if (numAllocs > 0) { - if (PlacementConstraintsUtil.canSatisfyConstraints(appId, + if (PlacementConstraintsUtil.canSatisfySingleConstraint(appId, schedulingRequest.getAllocationTags(), schedulerNode, constraintManager, tagsManager)) { return true; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java index 8e9c79c..2a6b889 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java @@ -188,12 +188,18 @@ public class PlacementProcessor implements ApplicationMasterServiceProcessor { @Override public void allocate(ApplicationAttemptId appAttemptId, AllocateRequest request, AllocateResponse response) throws YarnException { + // Copy the scheduling request since we will clear it later after sending + // to dispatcher List schedulingRequests = - request.getSchedulingRequests(); + new ArrayList<>(request.getSchedulingRequests()); dispatchRequestsForPlacement(appAttemptId, schedulingRequests); reDispatchRetryableRequests(appAttemptId); schedulePlacedRequests(appAttemptId); + // Remove SchedulingRequest from AllocateRequest to avoid SchedulingRequest + // added to scheduler. + request.setSchedulingRequests(Collections.emptyList()); + nextAMSProcessor.allocate(appAttemptId, request, response); handleRejectedRequests(appAttemptId, response); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 86d9fd7..be20dae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -790,9 +791,9 @@ public class FairScheduler extends @Override public Allocation allocate(ApplicationAttemptId appAttemptId, - List ask, List release, - List blacklistAdditions, List blacklistRemovals, - ContainerUpdates updateRequests) { + List ask, List schedulingRequests, + List release, List blacklistAdditions, + List blacklistRemovals, ContainerUpdates updateRequests) { // Make sure this application exists FSAppAttempt application = getSchedulerApp(appAttemptId); @@ -817,7 +818,9 @@ public class FairScheduler extends handleContainerUpdates(application, updateRequests); // Sanity check - normalizeRequests(ask); + normalizeResourceRequests(ask); + + // TODO, normalize SchedulingRequest // Record container allocation start time application.recordContainerRequestTime(getClock().getTime()); @@ -839,6 +842,7 @@ public class FairScheduler extends // Update application requests application.updateResourceRequests(ask); + // TODO, handle SchedulingRequest application.showRequests(); } } finally { http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 826575d..7c46f57 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -322,8 +323,8 @@ public class FifoScheduler extends @Override public Allocation allocate(ApplicationAttemptId applicationAttemptId, - List ask, List release, - List blacklistAdditions, List blacklistRemovals, + List ask, List schedulingRequests, + List release, List blacklistAdditions, List blacklistRemovals, ContainerUpdates updateRequests) { FifoAppAttempt application = getApplicationAttempt(applicationAttemptId); if (application == null) { @@ -344,7 +345,7 @@ public class FifoScheduler extends } // Sanity check - normalizeRequests(ask); + normalizeResourceRequests(ask); // Release containers releaseContainers(release, application); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java index 5c49450..72a6c4c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; @@ -29,7 +31,6 @@ import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import java.util.Collection; import java.util.Iterator; -import java.util.List; import java.util.Map; /** @@ -50,13 +51,18 @@ import java.util.Map; * requests. *

*/ -public interface AppPlacementAllocator { +public abstract class AppPlacementAllocator { + protected AppSchedulingInfo appSchedulingInfo; + protected SchedulerRequestKey schedulerRequestKey; + protected RMContext rmContext; + /** * Get iterator of preferred node depends on requirement and/or availability * @param candidateNodeSet input CandidateNodeSet * @return iterator of preferred node */ - Iterator getPreferredNodeIterator(CandidateNodeSet candidateNodeSet); + public abstract Iterator getPreferredNodeIterator( + CandidateNodeSet candidateNodeSet); /** * Replace existing pending asks by the new requests @@ -66,15 +72,29 @@ public interface AppPlacementAllocator { * requests for preempted container * @return true if total pending resource changed */ - PendingAskUpdateResult updatePendingAsk( + public abstract PendingAskUpdateResult updatePendingAsk( Collection requests, boolean recoverPreemptedRequestForAContainer); /** + * Replace existing pending asks by the new SchedulingRequest + * + * @param schedulerRequestKey scheduler request key + * @param schedulingRequest new asks + * @param recoverPreemptedRequestForAContainer if we're recovering resource + * requests for preempted container + * @return true if total pending resource changed + */ + public abstract PendingAskUpdateResult updatePendingAsk( + SchedulerRequestKey schedulerRequestKey, + SchedulingRequest schedulingRequest, + boolean recoverPreemptedRequestForAContainer); + + /** * Get pending ResourceRequests by given schedulerRequestKey * @return Map of resourceName to ResourceRequest */ - Map getResourceRequests(); + public abstract Map getResourceRequests(); /** * Get pending ask for given resourceName. If there's no such pendingAsk, @@ -83,7 +103,7 @@ public interface AppPlacementAllocator { * @param resourceName resourceName * @return PendingAsk */ - PendingAsk getPendingAsk(String resourceName); + public abstract PendingAsk getPendingAsk(String resourceName); /** * Get #pending-allocations for given resourceName. If there's no such @@ -92,7 +112,7 @@ public interface AppPlacementAllocator { * @param resourceName resourceName * @return #pending-allocations */ - int getOutstandingAsksCount(String resourceName); + public abstract int getOutstandingAsksCount(String resourceName); /** * Notify container allocated. @@ -103,7 +123,7 @@ public interface AppPlacementAllocator { * the container. This will be used by scheduler to recover requests. * Please refer to {@link ContainerRequest} for more details. */ - ContainerRequest allocate(SchedulerRequestKey schedulerKey, + public abstract ContainerRequest allocate(SchedulerRequestKey schedulerKey, NodeType type, SchedulerNode node); /** @@ -112,7 +132,7 @@ public interface AppPlacementAllocator { * @param node which node we will allocate on * @return true if we has pending requirement */ - boolean canAllocate(NodeType type, SchedulerNode node); + public abstract boolean canAllocate(NodeType type, SchedulerNode node); /** * Can delay to give locality? @@ -123,16 +143,16 @@ public interface AppPlacementAllocator { * @param resourceName resourceName * @return can/cannot */ - boolean canDelayTo(String resourceName); + public abstract boolean canDelayTo(String resourceName); /** - * Does this {@link AppPlacementAllocator} accept resources on nodePartition? + * Does this {@link AppPlacementAllocator} accept resources on given node? * - * @param nodePartition nodePartition + * @param schedulerNode schedulerNode * @param schedulingMode schedulingMode * @return accepted/not */ - boolean acceptNodePartition(String nodePartition, + public abstract boolean precheckNode(SchedulerNode schedulerNode, SchedulingMode schedulingMode); /** @@ -142,7 +162,7 @@ public interface AppPlacementAllocator { * * @return primary requested node partition */ - String getPrimaryRequestedNodePartition(); + public abstract String getPrimaryRequestedNodePartition(); /** * @return number of unique location asks with #pending greater than 0, @@ -152,18 +172,24 @@ public interface AppPlacementAllocator { * and should belong to specific delay scheduling policy impl. * See YARN-7457 for more details. */ - int getUniqueLocationAsks(); + public abstract int getUniqueLocationAsks(); /** * Print human-readable requests to LOG debug. */ - void showRequests(); + public abstract void showRequests(); /** - * Set app scheduling info. + * Initialize this allocator, this will be called by Factory automatically * - * @param appSchedulingInfo - * app info object. + * @param appSchedulingInfo appSchedulingInfo + * @param schedulerRequestKey schedulerRequestKey + * @param rmContext rmContext */ - void setAppSchedulingInfo(AppSchedulingInfo appSchedulingInfo); + public void initialize(AppSchedulingInfo appSchedulingInfo, + SchedulerRequestKey schedulerRequestKey, RMContext rmContext) { + this.appSchedulingInfo = appSchedulingInfo; + this.rmContext = rmContext; + this.schedulerRequestKey = schedulerRequestKey; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java index be1c1cc..a0358b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java @@ -22,8 +22,9 @@ import org.apache.commons.collections.IteratorUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; @@ -46,26 +47,18 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; * containers. */ public class LocalityAppPlacementAllocator - implements AppPlacementAllocator { + extends AppPlacementAllocator { private static final Log LOG = LogFactory.getLog(LocalityAppPlacementAllocator.class); private final Map resourceRequestMap = new ConcurrentHashMap<>(); - private AppSchedulingInfo appSchedulingInfo; private volatile String primaryRequestedPartition = RMNodeLabelsManager.NO_LABEL; private final ReentrantReadWriteLock.ReadLock readLock; private final ReentrantReadWriteLock.WriteLock writeLock; - public LocalityAppPlacementAllocator(AppSchedulingInfo info) { - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - readLock = lock.readLock(); - writeLock = lock.writeLock(); - this.appSchedulingInfo = info; - } - public LocalityAppPlacementAllocator() { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); readLock = lock.readLock(); @@ -182,6 +175,19 @@ public class LocalityAppPlacementAllocator } @Override + public PendingAskUpdateResult updatePendingAsk( + SchedulerRequestKey schedulerRequestKey, + SchedulingRequest schedulingRequest, + boolean recoverPreemptedRequestForAContainer) + throws SchedulerInvalidResoureRequestException { + throw new SchedulerInvalidResoureRequestException(this.getClass().getName() + + " not be able to handle SchedulingRequest, there exists a " + + "ResourceRequest with the same scheduler key=" + schedulerRequestKey + + ", please send SchedulingRequest with a different allocationId and " + + "priority"); + } + + @Override public Map getResourceRequests() { return resourceRequestMap; } @@ -362,13 +368,13 @@ public class LocalityAppPlacementAllocator } @Override - public boolean acceptNodePartition(String nodePartition, + public boolean precheckNode(SchedulerNode schedulerNode, SchedulingMode schedulingMode) { // We will only look at node label = nodeLabelToLookAt according to // schedulingMode and partition of node. String nodePartitionToLookAt; if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) { - nodePartitionToLookAt = nodePartition; + nodePartitionToLookAt = schedulerNode.getPartition(); } else { nodePartitionToLookAt = RMNodeLabelsManager.NO_LABEL; } @@ -425,9 +431,4 @@ public class LocalityAppPlacementAllocator writeLock.unlock(); } } - - @Override - public void setAppSchedulingInfo(AppSchedulingInfo appSchedulingInfo) { - this.appSchedulingInfo = appSchedulingInfo; - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java new file mode 100644 index 0000000..f8f758c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java @@ -0,0 +1,531 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.collections.IteratorUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraints; +import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintsUtil; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.APPLICATION_LABEL_INTRA_APPLICATION; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE_PARTITION; + +/** + * This is a simple implementation to do affinity or anti-affinity for + * inter/intra apps. + */ +public class SingleConstraintAppPlacementAllocator + extends AppPlacementAllocator { + private static final Log LOG = + LogFactory.getLog(SingleConstraintAppPlacementAllocator.class); + + private ReentrantReadWriteLock.ReadLock readLock; + private ReentrantReadWriteLock.WriteLock writeLock; + + private SchedulingRequest schedulingRequest = null; + private String targetNodePartition; + private Set targetAllocationTags; + private AllocationTagsManager allocationTagsManager; + + public SingleConstraintAppPlacementAllocator() { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + } + + @Override + @SuppressWarnings("unchecked") + public Iterator getPreferredNodeIterator( + CandidateNodeSet candidateNodeSet) { + // Now only handle the case that single node in the candidateNodeSet + // TODO, Add support to multi-hosts inside candidateNodeSet which is passed + // in. + + N singleNode = CandidateNodeSetUtils.getSingleNode(candidateNodeSet); + if (null != singleNode) { + return IteratorUtils.singletonIterator(singleNode); + } + + return IteratorUtils.emptyIterator(); + } + + @Override + public PendingAskUpdateResult updatePendingAsk( + Collection requests, + boolean recoverPreemptedRequestForAContainer) { + if (requests != null && !requests.isEmpty()) { + throw new SchedulerInvalidResoureRequestException( + this.getClass().getName() + + " not be able to handle ResourceRequest, there exists a " + + "SchedulingRequest with the same scheduler key=" + + SchedulerRequestKey.create(requests.iterator().next()) + + ", please send ResourceRequest with a different allocationId and " + + "priority"); + } + + // Do nothing + return null; + } + + private PendingAskUpdateResult internalUpdatePendingAsk( + SchedulingRequest newSchedulingRequest, boolean recoverContainer) { + // When it is a recover container, there must exists an schedulingRequest. + if (recoverContainer && schedulingRequest == null) { + throw new SchedulerInvalidResoureRequestException("Trying to recover a " + + "container request=" + newSchedulingRequest.toString() + ", however" + + "there's no existing scheduling request, this should not happen."); + } + + if (schedulingRequest != null) { + // If we have an old scheduling request, we will make sure that no changes + // made except sizing. + // To avoid unnecessary copy of the data structure, we do this by + // replacing numAllocations with old numAllocations in the + // newSchedulingRequest#getResourceSizing, and compare the two objects. + ResourceSizing sizing = newSchedulingRequest.getResourceSizing(); + int existingNumAllocations = + schedulingRequest.getResourceSizing().getNumAllocations(); + + // When it is a recovered container request, just set + // #newAllocations = #existingAllocations + 1; + int newNumAllocations; + if (recoverContainer) { + newNumAllocations = existingNumAllocations + 1; + } else { + newNumAllocations = sizing.getNumAllocations(); + } + sizing.setNumAllocations(existingNumAllocations); + + // Compare two objects + if (!schedulingRequest.equals(newSchedulingRequest)) { + // Rollback #numAllocations + sizing.setNumAllocations(newNumAllocations); + throw new SchedulerInvalidResoureRequestException( + "Invalid updated SchedulingRequest added to scheduler, " + + " we only allows changing numAllocations for the updated " + + "SchedulingRequest. Old=" + schedulingRequest.toString() + + " new=" + newSchedulingRequest.toString() + + ", if any fields need to be updated, please cancel the " + + "old request (by setting numAllocations to 0) and send a " + + "SchedulingRequest with different combination of " + + "priority/allocationId"); + } else { + if (newNumAllocations == existingNumAllocations) { + // No update on pending asks, return null. + return null; + } + } + + // Rollback #numAllocations + sizing.setNumAllocations(newNumAllocations); + + // Basic sanity check + if (newNumAllocations < 0) { + throw new SchedulerInvalidResoureRequestException( + "numAllocation in ResourceSizing field must be >= 0, " + + "updating schedulingRequest failed."); + } + + PendingAskUpdateResult updateResult = new PendingAskUpdateResult( + new PendingAsk(schedulingRequest.getResourceSizing()), + new PendingAsk(newSchedulingRequest.getResourceSizing()), + targetNodePartition, targetNodePartition); + + // Ok, now everything is same except numAllocation, update numAllocation. + this.schedulingRequest.getResourceSizing().setNumAllocations( + newNumAllocations); + LOG.info( + "Update numAllocation from old=" + existingNumAllocations + " to new=" + + newNumAllocations); + + return updateResult; + } + + // For a new schedulingRequest, we need to validate if we support its asks. + // This will update internal partitions, etc. after the SchedulingRequest is + // valid. + validateAndSetSchedulingRequest(newSchedulingRequest); + + return new PendingAskUpdateResult(null, + new PendingAsk(newSchedulingRequest.getResourceSizing()), null, + targetNodePartition); + } + + @Override + public PendingAskUpdateResult updatePendingAsk( + SchedulerRequestKey schedulerRequestKey, + SchedulingRequest newSchedulingRequest, + boolean recoverPreemptedRequestForAContainer) { + writeLock.lock(); + try { + return internalUpdatePendingAsk(newSchedulingRequest, + recoverPreemptedRequestForAContainer); + } finally { + writeLock.unlock(); + } + } + + private String throwExceptionWithMetaInfo(String message) { + StringBuilder sb = new StringBuilder(); + sb.append("AppId=").append(appSchedulingInfo.getApplicationId()).append( + " Key=").append(this.schedulerRequestKey).append(". Exception message:") + .append(message); + throw new SchedulerInvalidResoureRequestException(sb.toString()); + } + + private void validateAndSetSchedulingRequest(SchedulingRequest newSchedulingRequest) + throws SchedulerInvalidResoureRequestException { + // Check sizing exists + if (newSchedulingRequest.getResourceSizing() == null + || newSchedulingRequest.getResourceSizing().getResources() == null) { + throwExceptionWithMetaInfo( + "No ResourceSizing found in the scheduling request, please double " + + "check"); + } + + // Check execution type == GUARANTEED + if (newSchedulingRequest.getExecutionType() != null + && newSchedulingRequest.getExecutionType().getExecutionType() + != ExecutionType.GUARANTEED) { + throwExceptionWithMetaInfo( + "Only GUARANTEED execution type is supported."); + } + + PlacementConstraint constraint = + newSchedulingRequest.getPlacementConstraint(); + + // We only accept SingleConstraint + PlacementConstraint.AbstractConstraint ac = constraint.getConstraintExpr(); + if (!(ac instanceof PlacementConstraint.SingleConstraint)) { + throwExceptionWithMetaInfo( + "Only accepts " + PlacementConstraint.SingleConstraint.class.getName() + + " as constraint-expression. Rejecting the new added " + + "constraint-expression.class=" + ac.getClass().getName()); + } + + PlacementConstraint.SingleConstraint singleConstraint = + (PlacementConstraint.SingleConstraint) ac; + + // Make sure it is an anti-affinity request (actually this implementation + // should be able to support both affinity / anti-affinity without much + // effort. Considering potential test effort required. Limit to + // anti-affinity to intra-app and scope is node. + if (!singleConstraint.getScope().equals(PlacementConstraints.NODE)) { + throwExceptionWithMetaInfo( + "Only support scope=" + PlacementConstraints.NODE + + "now. PlacementConstraint=" + singleConstraint); + } + + if (singleConstraint.getMinCardinality() != 0 + || singleConstraint.getMaxCardinality() != 1) { + throwExceptionWithMetaInfo( + "Only support anti-affinity, which is: minCardinality=0, " + + "maxCardinality=1"); + } + + Set targetExpressionSet = + singleConstraint.getTargetExpressions(); + if (targetExpressionSet == null || targetExpressionSet.isEmpty()) { + throwExceptionWithMetaInfo( + "TargetExpression should not be null or empty"); + } + + // Set node partition + String nodePartition = null; + + // Target allocation tags + Set targetAllocationTags = null; + + for (PlacementConstraint.TargetExpression targetExpression : targetExpressionSet) { + // Handle node partition + if (targetExpression.getTargetType().equals( + PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE)) { + // For node attribute target, we only support Partition now. And once + // YARN-3409 is merged, we will support node attribute. + if (!targetExpression.getTargetKey().equals(NODE_PARTITION)) { + throwExceptionWithMetaInfo("When TargetType=" + + PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE + + " only " + NODE_PARTITION + " is accepted as TargetKey."); + } + + if (nodePartition != null) { + // This means we have duplicated node partition entry inside placement + // constraint, which might be set by mistake. + throwExceptionWithMetaInfo( + "Only one node partition targetExpression is allowed"); + } + + Set values = targetExpression.getTargetValues(); + if (values == null || values.isEmpty()) { + nodePartition = RMNodeLabelsManager.NO_LABEL; + continue; + } + + if (values.size() > 1) { + throwExceptionWithMetaInfo("Inside one targetExpression, we only " + + "support affinity to at most one node partition now"); + } + + nodePartition = values.iterator().next(); + } else if (targetExpression.getTargetType().equals( + PlacementConstraint.TargetExpression.TargetType.ALLOCATION_TAG)) { + // Handle allocation tags + if (targetAllocationTags != null) { + // This means we have duplicated AllocationTag expressions entries + // inside placement constraint, which might be set by mistake. + throwExceptionWithMetaInfo( + "Only one AllocationTag targetExpression is allowed"); + } + + if (targetExpression.getTargetValues() == null || targetExpression + .getTargetValues().isEmpty()) { + throwExceptionWithMetaInfo("Failed to find allocation tags from " + + "TargetExpressions or couldn't find self-app target."); + } + + targetAllocationTags = new HashSet<>( + targetExpression.getTargetValues()); + + if (targetExpression.getTargetKey() == null || !targetExpression + .getTargetKey().equals(APPLICATION_LABEL_INTRA_APPLICATION)) { + throwExceptionWithMetaInfo( + "As of now, the only accepted target key for targetKey of " + + "allocation_tag target expression is: [" + + APPLICATION_LABEL_INTRA_APPLICATION + + "]. Please make changes to placement constraints " + + "accordingly."); + } + } + } + + if (targetAllocationTags == null) { + // That means we don't have ALLOCATION_TAG specified + throwExceptionWithMetaInfo( + "Couldn't find target expression with type == ALLOCATION_TAG, it is " + + "required to include one and only one target expression with " + + "type == ALLOCATION_TAG"); + + } + + if (nodePartition == null) { + nodePartition = RMNodeLabelsManager.NO_LABEL; + } + + // Validation is done. set local results: + this.targetNodePartition = nodePartition; + this.targetAllocationTags = targetAllocationTags; + + this.schedulingRequest = new SchedulingRequestPBImpl( + ((SchedulingRequestPBImpl) newSchedulingRequest).getProto()); + + LOG.info("Successfully added SchedulingRequest to app=" + appSchedulingInfo + .getApplicationAttemptId() + " targetAllocationTags=[" + StringUtils + .join(",", targetAllocationTags) + "]. nodePartition=" + + targetNodePartition); + } + + @Override + @SuppressWarnings("unchecked") + public Map getResourceRequests() { + return Collections.EMPTY_MAP; + } + + @Override + public PendingAsk getPendingAsk(String resourceName) { + readLock.lock(); + try { + if (resourceName.equals("*") && schedulingRequest != null) { + return new PendingAsk(schedulingRequest.getResourceSizing()); + } + return PendingAsk.ZERO; + } finally { + readLock.unlock(); + } + + } + + @Override + public int getOutstandingAsksCount(String resourceName) { + readLock.lock(); + try { + if (resourceName.equals("*") && schedulingRequest != null) { + return schedulingRequest.getResourceSizing().getNumAllocations(); + } + return 0; + } finally { + readLock.unlock(); + } + } + + private void decreasePendingNumAllocation() { + // Deduct pending #allocations by 1 + ResourceSizing sizing = schedulingRequest.getResourceSizing(); + sizing.setNumAllocations(sizing.getNumAllocations() - 1); + } + + @Override + public ContainerRequest allocate(SchedulerRequestKey schedulerKey, + NodeType type, SchedulerNode node) { + writeLock.lock(); + try { + // Per container scheduling request, it is just a copy of existing + // scheduling request with #allocations=1 + SchedulingRequest containerSchedulingRequest = new SchedulingRequestPBImpl( + ((SchedulingRequestPBImpl) schedulingRequest).getProto()); + containerSchedulingRequest.getResourceSizing().setNumAllocations(1); + + // Deduct sizing + decreasePendingNumAllocation(); + + return new ContainerRequest(containerSchedulingRequest); + } finally { + writeLock.unlock(); + } + } + + private boolean checkCardinalityAndPending(SchedulerNode node) { + // Do we still have pending resource? + if (schedulingRequest.getResourceSizing().getNumAllocations() <= 0) { + return false; + } + + // node type will be ignored. + try { + return PlacementConstraintsUtil.canSatisfySingleConstraint( + appSchedulingInfo.getApplicationId(), + this.schedulingRequest.getPlacementConstraint(), node, + allocationTagsManager); + } catch (InvalidAllocationTagsQueryException e) { + LOG.warn("Failed to query node cardinality:", e); + return false; + } + } + + @Override + public boolean canAllocate(NodeType type, SchedulerNode node) { + try { + readLock.lock(); + return checkCardinalityAndPending(node); + } finally { + readLock.unlock(); + } + } + + @Override + public boolean canDelayTo(String resourceName) { + return true; + } + + @Override + public boolean precheckNode(SchedulerNode schedulerNode, + SchedulingMode schedulingMode) { + // We will only look at node label = nodeLabelToLookAt according to + // schedulingMode and partition of node. + String nodePartitionToLookAt; + if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) { + nodePartitionToLookAt = schedulerNode.getPartition(); + } else{ + nodePartitionToLookAt = RMNodeLabelsManager.NO_LABEL; + } + + readLock.lock(); + try { + // Check node partition as well as cardinality/pending resources. + return this.targetNodePartition.equals(nodePartitionToLookAt) + && checkCardinalityAndPending(schedulerNode); + } finally { + readLock.unlock(); + } + + } + + @Override + public String getPrimaryRequestedNodePartition() { + return targetNodePartition; + } + + @Override + public int getUniqueLocationAsks() { + return 1; + } + + @Override + public void showRequests() { + try { + readLock.lock(); + if (schedulingRequest != null) { + LOG.info(schedulingRequest.toString()); + } + } finally { + readLock.unlock(); + } + } + + @VisibleForTesting + SchedulingRequest getSchedulingRequest() { + return schedulingRequest; + } + + @VisibleForTesting + String getTargetNodePartition() { + return targetNodePartition; + } + + @VisibleForTesting + Set getTargetAllocationTags() { + return targetAllocationTags; + } + + @Override + public void initialize(AppSchedulingInfo appSchedulingInfo, + SchedulerRequestKey schedulerRequestKey, RMContext rmContext) { + super.initialize(appSchedulingInfo, schedulerRequestKey, rmContext); + this.allocationTagsManager = rmContext.getAllocationTagsManager(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java index fbde681..7d1140d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java @@ -331,8 +331,7 @@ public class Application { // Get resources from the ResourceManager Allocation allocation = resourceManager.getResourceScheduler().allocate( - applicationAttemptId, new ArrayList(ask), - new ArrayList(), null, null, + applicationAttemptId, new ArrayList(ask), null, new ArrayList(), null, null, new ContainerUpdates()); if (LOG.isInfoEnabled()) { @@ -431,7 +430,7 @@ public class Application { if (type == NodeType.NODE_LOCAL) { for (String host : task.getHosts()) { if(LOG.isDebugEnabled()) { - LOG.debug("updatePendingAsk:" + " application=" + applicationId + LOG.debug("updateResourceDemands:" + " application=" + applicationId + " type=" + type + " host=" + host + " request=" + ((requests == null) ? "null" : requests.get(host))); } @@ -442,7 +441,7 @@ public class Application { if (type == NodeType.NODE_LOCAL || type == NodeType.RACK_LOCAL) { for (String rack : task.getRacks()) { if(LOG.isDebugEnabled()) { - LOG.debug("updatePendingAsk:" + " application=" + applicationId + LOG.debug("updateResourceDemands:" + " application=" + applicationId + " type=" + type + " rack=" + rack + " request=" + ((requests == null) ? "null" : requests.get(rack))); } @@ -453,7 +452,7 @@ public class Application { updateResourceRequest(requests.get(ResourceRequest.ANY)); if(LOG.isDebugEnabled()) { - LOG.debug("updatePendingAsk:" + " application=" + applicationId + LOG.debug("updateResourceDemands:" + " application=" + applicationId + " #asks=" + ask.size()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index 975abe6..9fa2c40 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.lang.reflect.UndeclaredThrowableException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -37,14 +38,17 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.ResourceSizing; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraints; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -281,6 +285,53 @@ public class MockAM { } return allocate(req); } + + public AllocateResponse allocate(List resourceRequest, + List newSchedulingRequests, List releases) + throws Exception { + final AllocateRequest req = + AllocateRequest.newInstance(0, 0F, resourceRequest, + releases, null); + if (newSchedulingRequests != null) { + addSchedulingRequest(newSchedulingRequests); + } + if (!schedulingRequests.isEmpty()) { + req.setSchedulingRequests(schedulingRequests); + schedulingRequests.clear(); + } + return allocate(req); + } + + public AllocateResponse allocateIntraAppAntiAffinity( + ResourceSizing resourceSizing, Priority priority, long allocationId, + Set allocationTags, String... targetTags) throws Exception { + return this.allocate(null, + Arrays.asList(SchedulingRequest.newBuilder().executionType( + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)) + .allocationRequestId(allocationId).priority(priority) + .allocationTags(allocationTags).placementConstraintExpression( + PlacementConstraints + .targetCardinality(PlacementConstraints.NODE, 0, 1, + PlacementConstraints.PlacementTargets + .allocationTagToIntraApp(targetTags)).build()) + .resourceSizing(resourceSizing).build()), null); + } + + public AllocateResponse allocateIntraAppAntiAffinity( + String nodePartition, ResourceSizing resourceSizing, Priority priority, + long allocationId, String... tags) throws Exception { + return this.allocate(null, + Arrays.asList(SchedulingRequest.newBuilder().executionType( + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)) + .allocationRequestId(allocationId).priority(priority) + .placementConstraintExpression(PlacementConstraints + .targetCardinality(PlacementConstraints.NODE, 0, 1, + PlacementConstraints.PlacementTargets + .allocationTagToIntraApp(tags), + PlacementConstraints.PlacementTargets + .nodePartition(nodePartition)).build()) + .resourceSizing(resourceSizing).build()), null); + } public AllocateResponse sendContainerResizingRequest( List updateRequests) throws Exception { http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index 0e4f308..4a5c671 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -474,7 +474,7 @@ public class TestRMAppAttemptTransitions { assertEquals(expectedState, applicationAttempt.getAppAttemptState()); verify(scheduler, times(expectedAllocateCount)).allocate( - any(ApplicationAttemptId.class), any(List.class), any(List.class), + any(ApplicationAttemptId.class), any(List.class), eq(null), any(List.class), any(List.class), any(List.class), any(ContainerUpdates.class)); assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); @@ -495,7 +495,7 @@ public class TestRMAppAttemptTransitions { // Check events verify(applicationMasterLauncher).handle(any(AMLauncherEvent.class)); verify(scheduler, times(2)).allocate(any(ApplicationAttemptId.class), - any(List.class), any(List.class), any(List.class), any(List.class), + any(List.class), any(List.class), any(List.class), any(List.class), any(List.class), any(ContainerUpdates.class)); verify(nmTokenManager).clearNodeSetForAttempt( applicationAttempt.getAppAttemptId()); @@ -643,7 +643,7 @@ public class TestRMAppAttemptTransitions { when(allocation.getContainers()). thenReturn(Collections.singletonList(container)); when(scheduler.allocate(any(ApplicationAttemptId.class), any(List.class), - any(List.class), any(List.class), any(List.class), + any(List.class), any(List.class), any(List.class), any(List.class), any(ContainerUpdates.class))). thenReturn(allocation); RMContainer rmContainer = mock(RMContainerImpl.class); @@ -1161,7 +1161,7 @@ public class TestRMAppAttemptTransitions { when(allocation.getContainers()). thenReturn(Collections.singletonList(amContainer)); when(scheduler.allocate(any(ApplicationAttemptId.class), any(List.class), - any(List.class), any(List.class), any(List.class), + any(List.class), any(List.class), any(List.class), any(List.class), any(ContainerUpdates.class))) .thenReturn(allocation); RMContainer rmContainer = mock(RMContainerImpl.class); @@ -1636,7 +1636,7 @@ public class TestRMAppAttemptTransitions { public void testScheduleTransitionReplaceAMContainerRequestWithDefaults() { YarnScheduler mockScheduler = mock(YarnScheduler.class); when(mockScheduler.allocate(any(ApplicationAttemptId.class), - any(List.class), any(List.class), any(List.class), any(List.class), + any(List.class), any(List.class), any(List.class), any(List.class), any(List.class), any(ContainerUpdates.class))) .thenAnswer(new Answer() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java index b927870..2bf6a21 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java @@ -33,6 +33,7 @@ import java.util.ArrayList; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import com.google.common.collect.ImmutableSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -420,9 +421,10 @@ public class TestRMContainerImpl { when(rmContext.getYarnConfiguration()).thenReturn(conf); /* First container: ALLOCATED -> KILLED */ - RMContainer rmContainer = new RMContainerImpl(container, + RMContainerImpl rmContainer = new RMContainerImpl(container, SchedulerRequestKey.extractFrom(container), appAttemptId, nodeId, "user", rmContext); + rmContainer.setAllocationTags(ImmutableSet.of("mapper")); Assert.assertEquals(0, tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); @@ -448,6 +450,7 @@ public class TestRMContainerImpl { Assert.assertEquals(0, tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + rmContainer.setAllocationTags(ImmutableSet.of("mapper")); rmContainer.handle(new RMContainerEvent(containerId, RMContainerEventType.START)); @@ -468,6 +471,7 @@ public class TestRMContainerImpl { rmContainer = new RMContainerImpl(container, SchedulerRequestKey.extractFrom(container), appAttemptId, nodeId, "user", rmContext); + rmContainer.setAllocationTags(ImmutableSet.of("mapper")); Assert.assertEquals(0, tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java index 3692b29..b7b0eb7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java @@ -46,7 +46,7 @@ public class TestAppSchedulingInfo { doReturn("test").when(queue).getQueueName(); AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(appAttemptId, "test", queue, null, 0, new ResourceUsage(), - new HashMap()); + new HashMap(), null); appSchedulingInfo.updatePlacesBlacklistedByApp(new ArrayList(), new ArrayList()); @@ -118,7 +118,7 @@ public class TestAppSchedulingInfo { doReturn(mock(QueueMetrics.class)).when(queue).getMetrics(); AppSchedulingInfo info = new AppSchedulingInfo( appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0, - new ResourceUsage(), new HashMap()); + new ResourceUsage(), new HashMap<>(), null); Assert.assertEquals(0, info.getSchedulerKeys().size()); Priority pri1 = Priority.newInstance(1); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java new file mode 100644 index 0000000..5cea3a2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import com.google.common.collect.Sets; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.junit.Assert; + +import java.util.Set; + +public class CapacitySchedulerTestBase { + protected final int GB = 1024; + + protected static final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + protected static final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + protected static final String A1 = A + ".a1"; + protected static final String A2 = A + ".a2"; + protected static final String B1 = B + ".b1"; + protected static final String B2 = B + ".b2"; + protected static final String B3 = B + ".b3"; + protected static float A_CAPACITY = 10.5f; + protected static float B_CAPACITY = 89.5f; + protected static final String P1 = CapacitySchedulerConfiguration.ROOT + ".p1"; + protected static final String P2 = CapacitySchedulerConfiguration.ROOT + ".p2"; + protected static final String X1 = P1 + ".x1"; + protected static final String X2 = P1 + ".x2"; + protected static final String Y1 = P2 + ".y1"; + protected static final String Y2 = P2 + ".y2"; + protected static float A1_CAPACITY = 30; + protected static float A2_CAPACITY = 70; + protected static float B1_CAPACITY = 79.2f; + protected static float B2_CAPACITY = 0.8f; + protected static float B3_CAPACITY = 20; + + + @SuppressWarnings("unchecked") + protected Set toSet(E... elements) { + Set set = Sets.newHashSet(elements); + return set; + } + + protected void checkPendingResource(MockRM rm, String queueName, int memory, + String label) { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + CSQueue queue = cs.getQueue(queueName); + Assert.assertEquals( + memory, + queue.getQueueResourceUsage() + .getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label) + .getMemorySize()); + } + + + protected void checkPendingResourceGreaterThanZero(MockRM rm, String queueName, + String label) { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + CSQueue queue = cs.getQueue(queueName); + Assert.assertTrue(queue.getQueueResourceUsage() + .getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label) + .getMemorySize() > 0); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index e91f734..280be59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -103,7 +103,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMW import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; @@ -166,33 +165,10 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; -public class TestCapacityScheduler { +public class TestCapacityScheduler extends CapacitySchedulerTestBase { private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class); - private final int GB = 1024; private final static ContainerUpdates NULL_UPDATE_REQUESTS = new ContainerUpdates(); - - private static final String A = CapacitySchedulerConfiguration.ROOT + ".a"; - private static final String B = CapacitySchedulerConfiguration.ROOT + ".b"; - private static final String A1 = A + ".a1"; - private static final String A2 = A + ".a2"; - private static final String B1 = B + ".b1"; - private static final String B2 = B + ".b2"; - private static final String B3 = B + ".b3"; - private static float A_CAPACITY = 10.5f; - private static float B_CAPACITY = 89.5f; - private static final String P1 = CapacitySchedulerConfiguration.ROOT + ".p1"; - private static final String P2 = CapacitySchedulerConfiguration.ROOT + ".p2"; - private static final String X1 = P1 + ".x1"; - private static final String X2 = P1 + ".x2"; - private static final String Y1 = P2 + ".y1"; - private static final String Y2 = P2 + ".y2"; - private static float A1_CAPACITY = 30; - private static float A2_CAPACITY = 70; - private static float B1_CAPACITY = 79.2f; - private static float B2_CAPACITY = 0.8f; - private static float B3_CAPACITY = 20; - private ResourceManager resourceManager = null; private RMContext mockContext; @@ -1115,12 +1091,12 @@ public class TestCapacityScheduler { cs.handle(addAttemptEvent); // Verify the blacklist can be updated independent of requesting containers - cs.allocate(appAttemptId, Collections.emptyList(), + cs.allocate(appAttemptId, Collections.emptyList(), null, Collections.emptyList(), Collections.singletonList(host), null, NULL_UPDATE_REQUESTS); Assert.assertTrue(cs.getApplicationAttempt(appAttemptId) .isPlaceBlacklisted(host)); - cs.allocate(appAttemptId, Collections.emptyList(), + cs.allocate(appAttemptId, Collections.emptyList(), null, Collections.emptyList(), null, Collections.singletonList(host), NULL_UPDATE_REQUESTS); Assert.assertFalse(cs.getApplicationAttempt(appAttemptId) @@ -1216,8 +1192,7 @@ public class TestCapacityScheduler { //This will allocate for app1 cs.allocate(appAttemptId1, - Collections.singletonList(r1), - Collections.emptyList(), + Collections.singletonList(r1), null, Collections.emptyList(), null, null, NULL_UPDATE_REQUESTS); //And this will result in container assignment for app1 @@ -1233,8 +1208,7 @@ public class TestCapacityScheduler { //Now, allocate for app2 (this would be the first/AM allocation) ResourceRequest r2 = TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory); cs.allocate(appAttemptId2, - Collections.singletonList(r2), - Collections.emptyList(), + Collections.singletonList(r2), null, Collections.emptyList(), null, null, NULL_UPDATE_REQUESTS); //In this case we do not perform container assignment because we want to @@ -3480,12 +3454,6 @@ public class TestCapacityScheduler { + "queue-a's max capacity will be violated if container allocated"); } - @SuppressWarnings("unchecked") - private Set toSet(E... elements) { - Set set = Sets.newHashSet(elements); - return set; - } - @Test public void testQueueHierarchyPendingResourceUpdate() throws Exception { Configuration conf = @@ -3617,26 +3585,6 @@ public class TestCapacityScheduler { checkPendingResource(rm, "root", 0 * GB, "x"); } - private void checkPendingResource(MockRM rm, String queueName, int memory, - String label) { - CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); - CSQueue queue = cs.getQueue(queueName); - Assert.assertEquals( - memory, - queue.getQueueResourceUsage() - .getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label) - .getMemorySize()); - } - - private void checkPendingResourceGreaterThanZero(MockRM rm, String queueName, - String label) { - CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); - CSQueue queue = cs.getQueue(queueName); - Assert.assertTrue(queue.getQueueResourceUsage() - .getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label) - .getMemorySize() > 0); - } - // Test verifies AM Used resource for LeafQueue when AM ResourceRequest is // lesser than minimumAllocation @Test(timeout = 30000) @@ -3706,7 +3654,7 @@ public class TestCapacityScheduler { Allocation allocate = cs.allocate(appAttemptId, Collections. emptyList(), - Collections. emptyList(), null, null, + null, Collections. emptyList(), null, null, NULL_UPDATE_REQUESTS); Assert.assertNotNull(attempt); @@ -3723,7 +3671,7 @@ public class TestCapacityScheduler { allocate = cs.allocate(appAttemptId, Collections. emptyList(), - Collections. emptyList(), null, null, + null, Collections. emptyList(), null, null, NULL_UPDATE_REQUESTS); // All resources should be sent as headroom @@ -4249,8 +4197,7 @@ public class TestCapacityScheduler { y1Req = TestUtils.createResourceRequest( ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); cs.allocate(appAttemptId3, - Collections.singletonList(y1Req), - Collections.emptyList(), + Collections.singletonList(y1Req), null, Collections.emptyList(), null, null, NULL_UPDATE_REQUESTS); CapacityScheduler.schedule(cs); } @@ -4263,8 +4210,7 @@ public class TestCapacityScheduler { x1Req = TestUtils.createResourceRequest( ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); cs.allocate(appAttemptId1, - Collections.singletonList(x1Req), - Collections.emptyList(), + Collections.singletonList(x1Req), null, Collections.emptyList(), null, null, NULL_UPDATE_REQUESTS); CapacityScheduler.schedule(cs); } @@ -4276,8 +4222,7 @@ public class TestCapacityScheduler { x2Req = TestUtils.createResourceRequest( ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory); cs.allocate(appAttemptId2, - Collections.singletonList(x2Req), - Collections.emptyList(), + Collections.singletonList(x2Req), null, Collections.emptyList(), null, null, NULL_UPDATE_REQUESTS); CapacityScheduler.schedule(cs); assertEquals("X2 Used Resource should be 0", 0, @@ -4288,8 +4233,7 @@ public class TestCapacityScheduler { x1Req = TestUtils.createResourceRequest( ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); cs.allocate(appAttemptId1, - Collections.singletonList(x1Req), - Collections.emptyList(), + Collections.singletonList(x1Req), null, Collections.emptyList(), null, null, NULL_UPDATE_REQUESTS); CapacityScheduler.schedule(cs); assertEquals("X1 Used Resource should be 7 GB", 7 * GB, @@ -4302,8 +4246,7 @@ public class TestCapacityScheduler { y1Req = TestUtils.createResourceRequest( ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); cs.allocate(appAttemptId3, - Collections.singletonList(y1Req), - Collections.emptyList(), + Collections.singletonList(y1Req), null, Collections.emptyList(), null, null, NULL_UPDATE_REQUESTS); CapacityScheduler.schedule(cs); } @@ -4362,7 +4305,7 @@ public class TestCapacityScheduler { ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory); //This will allocate for app1 cs.allocate(appAttemptId1, Collections.singletonList(r1), - Collections.emptyList(), + null, Collections.emptyList(), null, null, NULL_UPDATE_REQUESTS).getContainers().size(); CapacityScheduler.schedule(cs); ResourceRequest r2 = null; @@ -4370,8 +4313,7 @@ public class TestCapacityScheduler { r2 = TestUtils.createResourceRequest( ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); cs.allocate(appAttemptId2, - Collections.singletonList(r2), - Collections.emptyList(), + Collections.singletonList(r2), null, Collections.emptyList(), null, null, NULL_UPDATE_REQUESTS); CapacityScheduler.schedule(cs); } @@ -4384,12 +4326,12 @@ public class TestCapacityScheduler { r2 = TestUtils.createResourceRequest( ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); cs.allocate(appAttemptId1, Collections.singletonList(r1), - Collections.emptyList(), + null, Collections.emptyList(), null, null, NULL_UPDATE_REQUESTS).getContainers().size(); CapacityScheduler.schedule(cs); cs.allocate(appAttemptId2, Collections.singletonList(r2), - Collections.emptyList(), null, null, NULL_UPDATE_REQUESTS); + null, Collections.emptyList(), null, null, NULL_UPDATE_REQUESTS); CapacityScheduler.schedule(cs); //Check blocked Resource assertEquals("A Used Resource should be 2 GB", 2 * GB, http://git-wip-us.apache.org/repos/asf/hadoop/blob/b82addc0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java index 6cb21d4..1a77fa0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java @@ -103,7 +103,7 @@ public class TestCapacitySchedulerAsyncScheduling { CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, numThreads); conf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX - + ".scheduling-interval-ms", 100); + + ".scheduling-interval-ms", 0); final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); mgr.init(conf); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org