From common-commits-return-78288-archive-asf-public=cust-asf.ponee.io@hadoop.apache.org Tue Jan 30 19:09:57 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id C285C18066D for ; Tue, 30 Jan 2018 19:09:57 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id B272C160C59; Tue, 30 Jan 2018 18:09:57 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 38F51160C55 for ; Tue, 30 Jan 2018 19:09:56 +0100 (CET) Received: (qmail 49454 invoked by uid 500); 30 Jan 2018 18:09:46 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 48637 invoked by uid 99); 30 Jan 2018 18:09:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Jan 2018 18:09:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E5727F4DD0; Tue, 30 Jan 2018 18:09:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: asuresh@apache.org To: common-commits@hadoop.apache.org Date: Tue, 30 Jan 2018 18:10:21 -0000 Message-Id: <7cadf8c5080b49ee9f1929eb8285fb30@git.apache.org> In-Reply-To: <72697fcdd04b4ac18856e7c4cefe773b@git.apache.org> References: <72697fcdd04b4ac18856e7c4cefe773b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [39/50] [abbrv] hadoop git commit: YARN-7682. Expose canSatisfyConstraints utility function to validate a placement against a constraint. (Panagiotis Garefalakis via asuresh) YARN-7682. Expose canSatisfyConstraints utility function to validate a placement against a constraint. (Panagiotis Garefalakis via asuresh) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5df2556b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5df2556b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5df2556b Branch: refs/heads/YARN-6592 Commit: 5df2556b5ad1e54a8a3a9d99b295dd764f5c801a Parents: 1350094 Author: Arun Suresh Authored: Wed Jan 3 08:00:50 2018 -0800 Committer: Arun Suresh Committed: Tue Jan 30 07:53:34 2018 -0800 ---------------------------------------------------------------------- .../constraint/PlacementConstraintsUtil.java | 132 +++++++++ .../algorithm/DefaultPlacementAlgorithm.java | 55 +--- .../TestPlacementConstraintsUtil.java | 287 +++++++++++++++++++ .../constraint/TestPlacementProcessor.java | 204 +++++++++++-- 4 files changed, 601 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5df2556b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java new file mode 100644 index 0000000..956a3c9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; + +import java.util.Iterator; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.SingleConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SingleConstraintTransformer; +import org.apache.hadoop.yarn.api.resource.PlacementConstraints; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm; + +/** + * This class contains various static methods used by the Placement Algorithms + * to simplify constrained placement. + * (see also {@link DefaultPlacementAlgorithm}). + */ +@Public +@Unstable +public final class PlacementConstraintsUtil { + + // Suppresses default constructor, ensuring non-instantiability. + private PlacementConstraintsUtil() { + } + + /** + * Returns true if **single** application constraint with associated + * allocationTags and scope is satisfied by a specific scheduler Node. + * + * @param appId the application id + * @param sc the placement constraint + * @param te the target expression + * @param node the scheduler node + * @param tm the allocation tags store + * @return true if single application constraint is satisfied by node + * @throws InvalidAllocationTagsQueryException + */ + private static boolean canSatisfySingleConstraintExpression( + ApplicationId appId, SingleConstraint sc, TargetExpression te, + SchedulerNode node, AllocationTagsManager tm) + throws InvalidAllocationTagsQueryException { + long minScopeCardinality = 0; + long maxScopeCardinality = 0; + if (sc.getScope() == PlacementConstraints.NODE) { + minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId, + te.getTargetValues(), Long::max); + maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId, + te.getTargetValues(), Long::min); + } else if (sc.getScope() == PlacementConstraints.RACK) { + minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId, + te.getTargetValues(), Long::max); + maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId, + te.getTargetValues(), Long::min); + } + // Make sure Anti-affinity satisfies hard upper limit + maxScopeCardinality = sc.getMaxCardinality() == 0 ? maxScopeCardinality - 1 + : maxScopeCardinality; + + return (minScopeCardinality >= sc.getMinCardinality() + && maxScopeCardinality < sc.getMaxCardinality()); + } + + /** + * Returns true if all application constraints with associated allocationTags + * are **currently** satisfied by a specific scheduler Node. + * To do so the method retrieves and goes through all application constraint + * expressions and checks if the specific allocation is between the allowed + * min-max cardinality values under the constraint scope (Node/Rack/etc). + * + * @param appId the application id + * @param allocationTags the allocation tags set + * @param node the scheduler node + * @param pcm the placement constraints store + * @param tagsManager the allocation tags store + * @return true if all application constraints are satisfied by node + * @throws InvalidAllocationTagsQueryException + */ + public static boolean canSatisfyConstraints(ApplicationId appId, + Set allocationTags, SchedulerNode node, + PlacementConstraintManager pcm, AllocationTagsManager tagsManager) + throws InvalidAllocationTagsQueryException { + PlacementConstraint constraint = pcm.getConstraint(appId, allocationTags); + if (constraint == null) { + return true; + } + // Transform to SimpleConstraint + SingleConstraintTransformer singleTransformer = + new SingleConstraintTransformer(constraint); + constraint = singleTransformer.transform(); + AbstractConstraint sConstraintExpr = constraint.getConstraintExpr(); + SingleConstraint single = (SingleConstraint) sConstraintExpr; + // Iterate through TargetExpressions + Iterator expIt = single.getTargetExpressions().iterator(); + while (expIt.hasNext()) { + TargetExpression currentExp = expIt.next(); + // Supporting AllocationTag Expressions for now + if (currentExp.getTargetType().equals(TargetType.ALLOCATION_TAG)) { + // Check if conditions are met + if (!canSatisfySingleConstraintExpression(appId, single, currentExp, + node, tagsManager)) { + return false; + } + } + } + // return true if all targetExpressions are satisfied + return true; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5df2556b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java index 395c156..9ed9ab1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java @@ -19,19 +19,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algor import java.util.Iterator; import java.util.List; -import java.util.Set; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.SchedulingRequest; -import org.apache.hadoop.yarn.api.resource.PlacementConstraint; -import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintsUtil; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmInput; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput; @@ -65,58 +62,14 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm { .getNodes(filter); } - /** - * TODO: Method will be moved to PlacementConstraintsUtil class (YARN-7682) - * @param applicationId - * @param allocationTags - * @param nodeId - * @param tagsManager - * @return boolean - * @throws InvalidAllocationTagsQueryException - */ - public boolean canAssign(ApplicationId applicationId, - Set allocationTags, NodeId nodeId, - AllocationTagsManager tagsManager) - throws InvalidAllocationTagsQueryException { - PlacementConstraint constraint = - constraintManager.getConstraint(applicationId, allocationTags); - if (constraint == null) { - return true; - } - // TODO: proper transformations - // Currently works only for simple anti-affinity - // NODE scope target expressions - PlacementConstraintTransformations.SpecializedConstraintTransformer transformer = - new PlacementConstraintTransformations.SpecializedConstraintTransformer( - constraint); - PlacementConstraint transform = transformer.transform(); - PlacementConstraint.TargetConstraint targetConstraint = - (PlacementConstraint.TargetConstraint) transform.getConstraintExpr(); - // Assume a single target expression tag; - // The Sample Algorithm assumes a constraint will always be a simple - // Target Constraint with a single entry in the target set. - // As mentioned in the class javadoc - This algorithm should be - // used mostly for testing and validating end-2-end workflow. - String targetTag = targetConstraint.getTargetExpressions().iterator().next() - .getTargetValues().iterator().next(); - // TODO: Assuming anti-affinity constraint - long nodeCardinality = - tagsManager.getNodeCardinality(nodeId, applicationId, targetTag); - if (nodeCardinality != 0) { - return false; - } - // return true if it is a valid placement - return true; - } - public boolean attemptPlacementOnNode(ApplicationId appId, SchedulingRequest schedulingRequest, SchedulerNode schedulerNode) throws InvalidAllocationTagsQueryException { int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations(); if (numAllocs > 0) { - if (canAssign(appId, - schedulingRequest.getAllocationTags(), schedulerNode.getNodeID(), - tagsManager)) { + if (PlacementConstraintsUtil.canSatisfyConstraints(appId, + schedulingRequest.getAllocationTags(), schedulerNode, + constraintManager, tagsManager)) { return true; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5df2556b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java new file mode 100644 index 0000000..7492233 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java @@ -0,0 +1,287 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; + +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.RACK; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag; + +import java.util.AbstractMap; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraints; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableSet; + +/** + * Test the PlacementConstraint Utility class functionality. + */ +public class TestPlacementConstraintsUtil { + + private List rmNodes; + private RMContext rmContext; + private static final int GB = 1024; + private ApplicationId appId1; + private PlacementConstraint c1, c2, c3, c4; + private Set sourceTag1, sourceTag2; + private Map, PlacementConstraint> constraintMap1, constraintMap2; + + @Before + public void setup() { + MockRM rm = new MockRM(); + rm.start(); + MockNodes.resetHostIds(); + rmNodes = MockNodes.newNodes(2, 2, Resource.newInstance(4096, 4)); + for (RMNode rmNode : rmNodes) { + rm.getRMContext().getRMNodes().putIfAbsent(rmNode.getNodeID(), rmNode); + } + rmContext = rm.getRMContext(); + + // Build appIDs, constraints, source tags, and constraint map. + long ts = System.currentTimeMillis(); + appId1 = BuilderUtils.newApplicationId(ts, 123); + + c1 = PlacementConstraints.build(targetIn(NODE, allocationTag("hbase-m"))); + c2 = PlacementConstraints.build(targetIn(RACK, allocationTag("hbase-rs"))); + c3 = PlacementConstraints + .build(targetNotIn(NODE, allocationTag("hbase-m"))); + c4 = PlacementConstraints + .build(targetNotIn(RACK, allocationTag("hbase-rs"))); + + sourceTag1 = new HashSet<>(Arrays.asList("spark")); + sourceTag2 = new HashSet<>(Arrays.asList("zk")); + + constraintMap1 = Stream + .of(new AbstractMap.SimpleEntry<>(sourceTag1, c1), + new AbstractMap.SimpleEntry<>(sourceTag2, c2)) + .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, + AbstractMap.SimpleEntry::getValue)); + constraintMap2 = Stream + .of(new AbstractMap.SimpleEntry<>(sourceTag1, c3), + new AbstractMap.SimpleEntry<>(sourceTag2, c4)) + .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, + AbstractMap.SimpleEntry::getValue)); + } + + @Test + public void testNodeAffinityAssignment() + throws InvalidAllocationTagsQueryException { + PlacementConstraintManagerService pcm = + new MemoryPlacementConstraintManager(); + AllocationTagsManager tm = new AllocationTagsManager(rmContext); + // Register App1 with affinity constraint map + pcm.registerApplication(appId1, constraintMap1); + // No containers are running so all 'zk' and 'spark' allocations should fail + // on every cluster NODE + Iterator nodeIterator = rmNodes.iterator(); + while (nodeIterator.hasNext()) { + RMNode currentNode = nodeIterator.next(); + FiCaSchedulerNode schedulerNode = TestUtils.getMockNode( + currentNode.getHostName(), currentNode.getRackName(), 123, 4 * GB); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + sourceTag1, schedulerNode, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + sourceTag2, schedulerNode, pcm, tm)); + } + /** + * Now place container: + * Node0:123 (Rack1): + * container_app1_1 (hbase-m) + */ + RMNode n0_r1 = rmNodes.get(0); + RMNode n1_r1 = rmNodes.get(1); + RMNode n2_r2 = rmNodes.get(2); + RMNode n3_r2 = rmNodes.get(3); + FiCaSchedulerNode schedulerNode0 = TestUtils + .getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB); + FiCaSchedulerNode schedulerNode1 = TestUtils + .getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB); + FiCaSchedulerNode schedulerNode2 = TestUtils + .getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB); + FiCaSchedulerNode schedulerNode3 = TestUtils + .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB); + // 1 Containers on node 0 with allocationTag 'hbase-m' + ContainerId hbase_m = ContainerId + .newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0); + tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-m")); + + // 'spark' placement on Node0 should now SUCCEED + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + sourceTag1, schedulerNode0, pcm, tm)); + // FAIL on the rest of the nodes + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + sourceTag1, schedulerNode1, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + sourceTag1, schedulerNode2, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + sourceTag1, schedulerNode3, pcm, tm)); + } + + @Test + public void testRackAffinityAssignment() + throws InvalidAllocationTagsQueryException { + PlacementConstraintManagerService pcm = + new MemoryPlacementConstraintManager(); + AllocationTagsManager tm = new AllocationTagsManager(rmContext); + // Register App1 with affinity constraint map + pcm.registerApplication(appId1, constraintMap1); + /** + * Now place container: + * Node0:123 (Rack1): + * container_app1_1 (hbase-rs) + */ + RMNode n0_r1 = rmNodes.get(0); + RMNode n1_r1 = rmNodes.get(1); + RMNode n2_r2 = rmNodes.get(2); + RMNode n3_r2 = rmNodes.get(3); + // 1 Containers on Node0-Rack1 with allocationTag 'hbase-rs' + ContainerId hbase_m = ContainerId + .newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0); + tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-rs")); + + FiCaSchedulerNode schedulerNode0 = TestUtils + .getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB); + FiCaSchedulerNode schedulerNode1 = TestUtils + .getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB); + FiCaSchedulerNode schedulerNode2 = TestUtils + .getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB); + FiCaSchedulerNode schedulerNode3 = TestUtils + .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB); + // 'zk' placement on Rack1 should now SUCCEED + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + sourceTag2, schedulerNode0, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + sourceTag2, schedulerNode1, pcm, tm)); + + // FAIL on the rest of the RACKs + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + sourceTag2, schedulerNode2, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + sourceTag2, schedulerNode3, pcm, tm)); + } + + @Test + public void testNodeAntiAffinityAssignment() + throws InvalidAllocationTagsQueryException { + PlacementConstraintManagerService pcm = + new MemoryPlacementConstraintManager(); + AllocationTagsManager tm = new AllocationTagsManager(rmContext); + // Register App1 with anti-affinity constraint map + pcm.registerApplication(appId1, constraintMap2); + /** + * place container: + * Node0:123 (Rack1): + * container_app1_1 (hbase-m) + */ + RMNode n0_r1 = rmNodes.get(0); + RMNode n1_r1 = rmNodes.get(1); + RMNode n2_r2 = rmNodes.get(2); + RMNode n3_r2 = rmNodes.get(3); + FiCaSchedulerNode schedulerNode0 = TestUtils + .getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB); + FiCaSchedulerNode schedulerNode1 = TestUtils + .getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB); + FiCaSchedulerNode schedulerNode2 = TestUtils + .getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB); + FiCaSchedulerNode schedulerNode3 = TestUtils + .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB); + // 1 Containers on node 0 with allocationTag 'hbase-m' + ContainerId hbase_m = ContainerId + .newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0); + tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-m")); + + // 'spark' placement on Node0 should now FAIL + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + sourceTag1, schedulerNode0, pcm, tm)); + // SUCCEED on the rest of the nodes + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + sourceTag1, schedulerNode1, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + sourceTag1, schedulerNode2, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + sourceTag1, schedulerNode3, pcm, tm)); + } + + @Test + public void testRackAntiAffinityAssignment() + throws InvalidAllocationTagsQueryException { + AllocationTagsManager tm = new AllocationTagsManager(rmContext); + PlacementConstraintManagerService pcm = + new MemoryPlacementConstraintManager(); + // Register App1 with anti-affinity constraint map + pcm.registerApplication(appId1, constraintMap2); + /** + * Place container: + * Node0:123 (Rack1): + * container_app1_1 (hbase-rs) + */ + RMNode n0_r1 = rmNodes.get(0); + RMNode n1_r1 = rmNodes.get(1); + RMNode n2_r2 = rmNodes.get(2); + RMNode n3_r2 = rmNodes.get(3); + // 1 Containers on Node0-Rack1 with allocationTag 'hbase-rs' + ContainerId hbase_m = ContainerId + .newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0); + tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-rs")); + + FiCaSchedulerNode schedulerNode0 = TestUtils + .getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB); + FiCaSchedulerNode schedulerNode1 = TestUtils + .getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB); + FiCaSchedulerNode schedulerNode2 = TestUtils + .getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB); + FiCaSchedulerNode schedulerNode3 = TestUtils + .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB); + + // 'zk' placement on Rack1 should FAIL + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + sourceTag2, schedulerNode0, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + sourceTag2, schedulerNode1, pcm, tm)); + + // SUCCEED on the rest of the RACKs + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + sourceTag2, schedulerNode2, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + sourceTag2, schedulerNode3, pcm, tm)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/5df2556b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java index 87dd5b7..c260fe0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.RejectionReason; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceSizing; import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraints; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -48,16 +49,21 @@ import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import static java.lang.Thread.sleep; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetCardinality; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn; /** * This tests end2end workflow of the constraint placement framework. @@ -104,7 +110,7 @@ public class TestPlacementProcessor { } @Test(timeout = 300000) - public void testPlacement() throws Exception { + public void testAntiAffinityPlacement() throws Exception { HashMap nodes = new HashMap<>(); MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); nodes.put(nm1.getNodeId(), nm1); @@ -120,44 +126,174 @@ public class TestPlacementProcessor { nm4.registerNode(); RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + // Containers with allocationTag 'foo' are restricted to 1 per NODE MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, - Collections.singletonMap( - Collections.singleton("foo"), + Collections.singletonMap(Collections.singleton("foo"), PlacementConstraints.build( - PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))) - )); + PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))))); am1.addSchedulingRequest( - Arrays.asList( - schedulingRequest(1, 1, 1, 512, "foo"), + Arrays.asList(schedulingRequest(1, 1, 1, 512, "foo"), schedulingRequest(1, 2, 1, 512, "foo"), schedulingRequest(1, 3, 1, 512, "foo"), - schedulingRequest(1, 5, 1, 512, "foo")) - ); + schedulingRequest(1, 5, 1, 512, "foo"))); AllocateResponse allocResponse = am1.schedule(); // send the request List allocatedContainers = new ArrayList<>(); allocatedContainers.addAll(allocResponse.getAllocatedContainers()); // kick the scheduler - - while (allocatedContainers.size() < 4) { - nm1.nodeHeartbeat(true); - nm2.nodeHeartbeat(true); - nm3.nodeHeartbeat(true); - nm4.nodeHeartbeat(true); - LOG.info("Waiting for containers to be created for app 1..."); - sleep(1000); - allocResponse = am1.schedule(); - allocatedContainers.addAll(allocResponse.getAllocatedContainers()); - } + waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 4); Assert.assertEquals(4, allocatedContainers.size()); - Set nodeIds = allocatedContainers.stream() - .map(x -> x.getNodeId()).collect(Collectors.toSet()); - // Ensure unique nodes + Set nodeIds = allocatedContainers.stream().map(x -> x.getNodeId()) + .collect(Collectors.toSet()); + // Ensure unique nodes (antiaffinity) Assert.assertEquals(4, nodeIds.size()); } @Test(timeout = 300000) + public void testCardinalityPlacement() throws Exception { + HashMap nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm3.getNodeId(), nm3); + MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm4.getNodeId(), nm4); + nm1.registerNode(); + nm2.registerNode(); + nm3.registerNode(); + nm4.registerNode(); + + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + // Containers with allocationTag 'foo' should not exceed 4 per NODE + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, + Collections.singletonMap(Collections.singleton("foo"), + PlacementConstraints.build(PlacementConstraints + .targetCardinality(NODE, 0, 4, allocationTag("foo"))))); + am1.addSchedulingRequest( + Arrays.asList(schedulingRequest(1, 1, 1, 512, "foo"), + schedulingRequest(1, 2, 1, 512, "foo"), + schedulingRequest(1, 3, 1, 512, "foo"), + schedulingRequest(1, 4, 1, 512, "foo"), + schedulingRequest(1, 5, 1, 512, "foo"), + schedulingRequest(1, 6, 1, 512, "foo"), + schedulingRequest(1, 7, 1, 512, "foo"), + schedulingRequest(1, 8, 1, 512, "foo"))); + AllocateResponse allocResponse = am1.schedule(); // send the request + List allocatedContainers = new ArrayList<>(); + allocatedContainers.addAll(allocResponse.getAllocatedContainers()); + + // kick the scheduler + waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 8); + + Assert.assertEquals(8, allocatedContainers.size()); + Map nodeIdContainerIdMap = + allocatedContainers.stream().collect( + Collectors.groupingBy(c -> c.getNodeId(), Collectors.counting())); + // Ensure no more than 4 containers per node + for (NodeId n : nodeIdContainerIdMap.keySet()) { + Assert.assertTrue(nodeIdContainerIdMap.get(n) < 5); + } + } + + @Test(timeout = 300000) + public void testAffinityPlacement() throws Exception { + HashMap nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm3.getNodeId(), nm3); + MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm4.getNodeId(), nm4); + nm1.registerNode(); + nm2.registerNode(); + nm3.registerNode(); + nm4.registerNode(); + + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + // Containers with allocationTag 'foo' should be placed where + // containers with allocationTag 'bar' are already running + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, + Collections.singletonMap(Collections.singleton("foo"), + PlacementConstraints.build( + PlacementConstraints.targetIn(NODE, allocationTag("bar"))))); + am1.addSchedulingRequest( + Arrays.asList(schedulingRequest(1, 1, 1, 512, "bar"), + schedulingRequest(1, 2, 1, 512, "foo"), + schedulingRequest(1, 3, 1, 512, "foo"), + schedulingRequest(1, 4, 1, 512, "foo"), + schedulingRequest(1, 5, 1, 512, "foo"))); + AllocateResponse allocResponse = am1.schedule(); // send the request + List allocatedContainers = new ArrayList<>(); + allocatedContainers.addAll(allocResponse.getAllocatedContainers()); + + // kick the scheduler + waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 5); + + Assert.assertEquals(5, allocatedContainers.size()); + Set nodeIds = allocatedContainers.stream().map(x -> x.getNodeId()) + .collect(Collectors.toSet()); + // Ensure all containers end up on the same node (affinity) + Assert.assertEquals(1, nodeIds.size()); + } + + @Test(timeout = 300000) + public void testComplexPlacement() throws Exception { + HashMap nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm3.getNodeId(), nm3); + MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm4.getNodeId(), nm4); + nm1.registerNode(); + nm2.registerNode(); + nm3.registerNode(); + nm4.registerNode(); + + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + Map, PlacementConstraint> constraintMap = new HashMap<>(); + // Containers with allocationTag 'bar' should not exceed 1 per NODE + constraintMap.put(Collections.singleton("bar"), + PlacementConstraints.build(targetNotIn(NODE, allocationTag("bar")))); + // Containers with allocationTag 'foo' should be placed where 'bar' exists + constraintMap.put(Collections.singleton("foo"), + PlacementConstraints.build(targetIn(NODE, allocationTag("bar")))); + // Containers with allocationTag 'foo' should not exceed 2 per NODE + constraintMap.put(Collections.singleton("foo"), PlacementConstraints + .build(targetCardinality(NODE, 0, 2, allocationTag("foo")))); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, constraintMap); + am1.addSchedulingRequest( + Arrays.asList(schedulingRequest(1, 1, 1, 512, "bar"), + schedulingRequest(1, 2, 1, 512, "bar"), + schedulingRequest(1, 3, 1, 512, "foo"), + schedulingRequest(1, 4, 1, 512, "foo"), + schedulingRequest(1, 5, 1, 512, "foo"), + schedulingRequest(1, 6, 1, 512, "foo"))); + AllocateResponse allocResponse = am1.schedule(); // send the request + List allocatedContainers = new ArrayList<>(); + allocatedContainers.addAll(allocResponse.getAllocatedContainers()); + + // kick the scheduler + waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 6); + + Assert.assertEquals(6, allocatedContainers.size()); + Map nodeIdContainerIdMap = + allocatedContainers.stream().collect( + Collectors.groupingBy(c -> c.getNodeId(), Collectors.counting())); + // Ensure no more than 3 containers per node (1 'bar', 2 'foo') + for (NodeId n : nodeIdContainerIdMap.keySet()) { + Assert.assertTrue(nodeIdContainerIdMap.get(n) < 4); + } + } + + @Test(timeout = 300000) public void testSchedulerRejection() throws Exception { HashMap nodes = new HashMap<>(); MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); @@ -174,6 +310,7 @@ public class TestPlacementProcessor { nm4.registerNode(); RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + // Containers with allocationTag 'foo' are restricted to 1 per NODE MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, Collections.singletonMap( Collections.singleton("foo"), @@ -196,7 +333,6 @@ public class TestPlacementProcessor { rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests()); // kick the scheduler - while (allocCount < 11) { nm1.nodeHeartbeat(true); nm2.nodeHeartbeat(true); @@ -253,9 +389,10 @@ public class TestPlacementProcessor { nm2.registerNode(); nm3.registerNode(); nm4.registerNode(); - // No not register nm5 yet.. + // Do not register nm5 yet.. RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + // Containers with allocationTag 'foo' are restricted to 1 per NODE MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, Collections.singletonMap( Collections.singleton("foo"), @@ -323,6 +460,7 @@ public class TestPlacementProcessor { nm4.registerNode(); RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + // Containers with allocationTag 'foo' are restricted to 1 per NODE MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, Collections.singletonMap( Collections.singleton("foo"), @@ -346,7 +484,6 @@ public class TestPlacementProcessor { rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests()); // kick the scheduler - while (allocCount < 11) { nm1.nodeHeartbeat(true); nm2.nodeHeartbeat(true); @@ -373,6 +510,21 @@ public class TestPlacementProcessor { rej.getReason()); } + private static void waitForContainerAllocation(Collection nodes, + MockAM am, List allocatedContainers, int containerNum) + throws Exception { + while (allocatedContainers.size() < containerNum) { + for (MockNM node : nodes) { + node.nodeHeartbeat(true); + } + LOG.info("Waiting for containers to be created for " + + am.getApplicationAttemptId().getApplicationId() + "..."); + sleep(1000); + AllocateResponse allocResponse = am.schedule(); + allocatedContainers.addAll(allocResponse.getAllocatedContainers()); + } + } + protected static SchedulingRequest schedulingRequest( int priority, long allocReqId, int cores, int mem, String... tags) { return schedulingRequest(priority, allocReqId, cores, mem, --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org