From common-commits-return-78419-archive-asf-public=cust-asf.ponee.io@hadoop.apache.org Wed Jan 31 16:57:40 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id D188D180662 for ; Wed, 31 Jan 2018 16:57:40 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id C14D1160C55; Wed, 31 Jan 2018 15:57:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2825A160C25 for ; Wed, 31 Jan 2018 16:57:39 +0100 (CET) Received: (qmail 7489 invoked by uid 500); 31 Jan 2018 15:57:28 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 7357 invoked by uid 99); 31 Jan 2018 15:57:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 31 Jan 2018 15:57:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6A785F4DBC; Wed, 31 Jan 2018 15:57:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: asuresh@apache.org To: common-commits@hadoop.apache.org Date: Wed, 31 Jan 2018 15:57:37 -0000 Message-Id: <1ce699daa6a44a74b9be70093600a4e2@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [13/32] hadoop git commit: YARN-7653. Node group support for AllocationTagsManager. (Panagiotis Garefalakis via asuresh) YARN-7653. Node group support for AllocationTagsManager. (Panagiotis Garefalakis via asuresh) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/37f1a7b6 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/37f1a7b6 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/37f1a7b6 Branch: refs/heads/trunk Commit: 37f1a7b64fcc93191367330cd59d4d71d7b29ac7 Parents: 06eb63e Author: Arun Suresh Authored: Fri Dec 22 07:24:37 2017 -0800 Committer: Arun Suresh Committed: Wed Jan 31 01:30:17 2018 -0800 ---------------------------------------------------------------------- .../server/resourcemanager/ResourceManager.java | 2 +- .../constraint/AllocationTagsManager.java | 282 ++++++++++++++----- .../rmcontainer/TestRMContainerImpl.java | 2 +- .../constraint/TestAllocationTagsManager.java | 269 ++++++++++++------ 4 files changed, 392 insertions(+), 163 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/37f1a7b6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index a1d3dfc..1d838f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -496,7 +496,7 @@ public class ResourceManager extends CompositeService implements Recoverable { } protected AllocationTagsManager createAllocationTagsManager() { - return new AllocationTagsManager(); + return new AllocationTagsManager(this.rmContext); } protected DelegationTokenRenewer createDelegationTokenRenewer() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/37f1a7b6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java index c278606..7b0b959 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.log4j.Logger; import java.util.HashMap; @@ -38,9 +39,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.LongBinaryOperator; /** - * Support storing maps between container-tags/applications and - * nodes. This will be required by affinity/anti-affinity implementation and - * cardinality. + * In-memory mapping between applications/container-tags and nodes/racks. + * Required by constrained affinity/anti-affinity and cardinality placement. */ @InterfaceAudience.Private @InterfaceStability.Unstable @@ -51,48 +51,54 @@ public class AllocationTagsManager { private ReentrantReadWriteLock.ReadLock readLock; private ReentrantReadWriteLock.WriteLock writeLock; + private final RMContext rmContext; - // Application's tags to node - private Map perAppMappings = + // Application's tags to Node + private Map perAppNodeMappings = + new HashMap<>(); + // Application's tags to Rack + private Map perAppRackMappings = new HashMap<>(); // Global tags to node mapping (used to fast return aggregated tags // cardinality across apps) - private NodeToCountedTags globalMapping = new NodeToCountedTags(); + private NodeToCountedTags globalNodeMapping = new NodeToCountedTags(); + // Global tags to Rack mapping + private NodeToCountedTags globalRackMapping = new NodeToCountedTags(); /** - * Store node to counted tags. + * Generic store mapping type to counted tags. + * Currently used both for NodeId to Tag, Count and Rack to Tag, Count */ @VisibleForTesting - static class NodeToCountedTags { - // Map> - private Map> nodeToTagsWithCount = - new HashMap<>(); + static class NodeToCountedTags { + // Map> + private Map> typeToTagsWithCount = new HashMap<>(); // protected by external locks - private void addTagsToNode(NodeId nodeId, Set tags) { - Map innerMap = nodeToTagsWithCount.computeIfAbsent(nodeId, - k -> new HashMap<>()); + private void addTags(T type, Set tags) { + Map innerMap = + typeToTagsWithCount.computeIfAbsent(type, k -> new HashMap<>()); for (String tag : tags) { Long count = innerMap.get(tag); if (count == null) { innerMap.put(tag, 1L); - } else{ + } else { innerMap.put(tag, count + 1); } } } // protected by external locks - private void addTagToNode(NodeId nodeId, String tag) { - Map innerMap = nodeToTagsWithCount.computeIfAbsent(nodeId, - k -> new HashMap<>()); + private void addTag(T type, String tag) { + Map innerMap = + typeToTagsWithCount.computeIfAbsent(type, k -> new HashMap<>()); Long count = innerMap.get(tag); if (count == null) { innerMap.put(tag, 1L); - } else{ + } else { innerMap.put(tag, count + 1); } } @@ -104,17 +110,17 @@ public class AllocationTagsManager { } else { if (count <= 0) { LOG.warn( - "Trying to remove tags from node, however the count already" + "Trying to remove tags from node/rack, however the count already" + " becomes 0 or less, it could be a potential bug."); } innerMap.remove(tag); } } - private void removeTagsFromNode(NodeId nodeId, Set tags) { - Map innerMap = nodeToTagsWithCount.get(nodeId); + private void removeTags(T type, Set tags) { + Map innerMap = typeToTagsWithCount.get(type); if (innerMap == null) { - LOG.warn("Failed to find node=" + nodeId + LOG.warn("Failed to find node/rack=" + type + " while trying to remove tags, please double check."); return; } @@ -124,14 +130,14 @@ public class AllocationTagsManager { } if (innerMap.isEmpty()) { - nodeToTagsWithCount.remove(nodeId); + typeToTagsWithCount.remove(type); } } - private void removeTagFromNode(NodeId nodeId, String tag) { - Map innerMap = nodeToTagsWithCount.get(nodeId); + private void removeTag(T type, String tag) { + Map innerMap = typeToTagsWithCount.get(type); if (innerMap == null) { - LOG.warn("Failed to find node=" + nodeId + LOG.warn("Failed to find node/rack=" + type + " while trying to remove tags, please double check."); return; } @@ -139,12 +145,12 @@ public class AllocationTagsManager { removeTagFromInnerMap(innerMap, tag); if (innerMap.isEmpty()) { - nodeToTagsWithCount.remove(nodeId); + typeToTagsWithCount.remove(type); } } - private long getCardinality(NodeId nodeId, String tag) { - Map innerMap = nodeToTagsWithCount.get(nodeId); + private long getCardinality(T type, String tag) { + Map innerMap = typeToTagsWithCount.get(type); if (innerMap == null) { return 0; } @@ -152,9 +158,9 @@ public class AllocationTagsManager { return value == null ? 0 : value; } - private long getCardinality(NodeId nodeId, Set tags, + private long getCardinality(T type, Set tags, LongBinaryOperator op) { - Map innerMap = nodeToTagsWithCount.get(nodeId); + Map innerMap = typeToTagsWithCount.get(type); if (innerMap == null) { return 0; } @@ -193,29 +199,40 @@ public class AllocationTagsManager { } private boolean isEmpty() { - return nodeToTagsWithCount.isEmpty(); + return typeToTagsWithCount.isEmpty(); } @VisibleForTesting - public Map> getNodeToTagsWithCount() { - return nodeToTagsWithCount; + public Map> getTypeToTagsWithCount() { + return typeToTagsWithCount; } } @VisibleForTesting - Map getPerAppMappings() { - return perAppMappings; + Map getPerAppNodeMappings() { + return perAppNodeMappings; + } + + @VisibleForTesting + Map getPerAppRackMappings() { + return perAppRackMappings; + } + + @VisibleForTesting + NodeToCountedTags getGlobalNodeMapping() { + return globalNodeMapping; } @VisibleForTesting - NodeToCountedTags getGlobalMapping() { - return globalMapping; + NodeToCountedTags getGlobalRackMapping() { + return globalRackMapping; } - public AllocationTagsManager() { + public AllocationTagsManager(RMContext context) { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); readLock = lock.readLock(); writeLock = lock.writeLock(); + rmContext = context; } /** @@ -243,21 +260,30 @@ public class AllocationTagsManager { writeLock.lock(); try { - NodeToCountedTags perAppTagsMapping = perAppMappings.computeIfAbsent( - applicationId, k -> new NodeToCountedTags()); - + NodeToCountedTags perAppTagsMapping = perAppNodeMappings + .computeIfAbsent(applicationId, k -> new NodeToCountedTags()); + NodeToCountedTags perAppRackTagsMapping = perAppRackMappings + .computeIfAbsent(applicationId, k -> new NodeToCountedTags()); + // Covering test-cases where context is mocked + String nodeRack = (rmContext.getRMNodes() != null + && rmContext.getRMNodes().get(nodeId) != null) + ? rmContext.getRMNodes().get(nodeId).getRackName() + : "default-rack"; if (useSet) { - perAppTagsMapping.addTagsToNode(nodeId, allocationTags); - globalMapping.addTagsToNode(nodeId, allocationTags); + perAppTagsMapping.addTags(nodeId, allocationTags); + perAppRackTagsMapping.addTags(nodeRack, allocationTags); + globalNodeMapping.addTags(nodeId, allocationTags); + globalRackMapping.addTags(nodeRack, allocationTags); } else { - perAppTagsMapping.addTagToNode(nodeId, applicationIdTag); - globalMapping.addTagToNode(nodeId, applicationIdTag); + perAppTagsMapping.addTag(nodeId, applicationIdTag); + perAppRackTagsMapping.addTag(nodeRack, applicationIdTag); + globalNodeMapping.addTag(nodeId, applicationIdTag); + globalRackMapping.addTag(nodeRack, applicationIdTag); } if (LOG.isDebugEnabled()) { - LOG.debug( - "Added container=" + containerId + " with tags=[" + StringUtils - .join(allocationTags, ",") + "]"); + LOG.debug("Added container=" + containerId + " with tags=[" + + StringUtils.join(allocationTags, ",") + "]"); } } finally { writeLock.unlock(); @@ -287,27 +313,40 @@ public class AllocationTagsManager { writeLock.lock(); try { - NodeToCountedTags perAppTagsMapping = perAppMappings.get(applicationId); + NodeToCountedTags perAppTagsMapping = + perAppNodeMappings.get(applicationId); + NodeToCountedTags perAppRackTagsMapping = + perAppRackMappings.get(applicationId); if (perAppTagsMapping == null) { return; } - + // Covering test-cases where context is mocked + String nodeRack = (rmContext.getRMNodes() != null + && rmContext.getRMNodes().get(nodeId) != null) + ? rmContext.getRMNodes().get(nodeId).getRackName() + : "default-rack"; if (useSet) { - perAppTagsMapping.removeTagsFromNode(nodeId, allocationTags); - globalMapping.removeTagsFromNode(nodeId, allocationTags); + perAppTagsMapping.removeTags(nodeId, allocationTags); + perAppRackTagsMapping.removeTags(nodeRack, allocationTags); + globalNodeMapping.removeTags(nodeId, allocationTags); + globalRackMapping.removeTags(nodeRack, allocationTags); } else { - perAppTagsMapping.removeTagFromNode(nodeId, applicationIdTag); - globalMapping.removeTagFromNode(nodeId, applicationIdTag); + perAppTagsMapping.removeTag(nodeId, applicationIdTag); + perAppRackTagsMapping.removeTag(nodeRack, applicationIdTag); + globalNodeMapping.removeTag(nodeId, applicationIdTag); + globalRackMapping.removeTag(nodeRack, applicationIdTag); } if (perAppTagsMapping.isEmpty()) { - perAppMappings.remove(applicationId); + perAppNodeMappings.remove(applicationId); + } + if (perAppRackTagsMapping.isEmpty()) { + perAppRackMappings.remove(applicationId); } if (LOG.isDebugEnabled()) { - LOG.debug( - "Removed container=" + containerId + " with tags=[" + StringUtils - .join(allocationTags, ",") + "]"); + LOG.debug("Removed container=" + containerId + " with tags=[" + + StringUtils.join(allocationTags, ",") + "]"); } } finally { writeLock.unlock(); @@ -315,18 +354,16 @@ public class AllocationTagsManager { } /** - * Get cardinality for following conditions. External can pass-in a binary op - * to implement customized logic. * + * Get Node cardinality for a specific tag. + * When applicationId is null, method returns aggregated cardinality + * * @param nodeId nodeId, required. * @param applicationId applicationId. When null is specified, return * aggregated cardinality among all nodes. * @param tag allocation tag, see * {@link SchedulingRequest#getAllocationTags()}, - * When multiple tags specified. Returns cardinality - * depends on op. If a specified tag doesn't exist, - * 0 will be its cardinality. - * When null/empty tags specified, all tags - * (of the node/app) will be considered. + * If a specified tag doesn't exist, + * method returns 0. * @return cardinality of specified query on the node. * @throws InvalidAllocationTagsQueryException when illegal query * parameter specified @@ -338,14 +375,14 @@ public class AllocationTagsManager { try { if (nodeId == null) { throw new InvalidAllocationTagsQueryException( - "Must specify nodeId/tags/op to query cardinality"); + "Must specify nodeId/tag to query cardinality"); } NodeToCountedTags mapping; if (applicationId != null) { - mapping = perAppMappings.get(applicationId); - } else{ - mapping = globalMapping; + mapping = perAppNodeMappings.get(applicationId); + } else { + mapping = globalNodeMapping; } if (mapping == null) { @@ -359,11 +396,54 @@ public class AllocationTagsManager { } /** + * Get Rack cardinality for a specific tag. + * + * @param rack rack, required. + * @param applicationId applicationId. When null is specified, return + * aggregated cardinality among all nodes. + * @param tag allocation tag, see + * {@link SchedulingRequest#getAllocationTags()}, + * If a specified tag doesn't exist, + * method returns 0. + * @return cardinality of specified query on the rack. + * @throws InvalidAllocationTagsQueryException when illegal query + * parameter specified + */ + public long getRackCardinality(String rack, ApplicationId applicationId, + String tag) throws InvalidAllocationTagsQueryException { + readLock.lock(); + + try { + if (rack == null) { + throw new InvalidAllocationTagsQueryException( + "Must specify rack/tag to query cardinality"); + } + + NodeToCountedTags mapping; + if (applicationId != null) { + mapping = perAppRackMappings.get(applicationId); + } else { + mapping = globalRackMapping; + } + + if (mapping == null) { + return 0; + } + + return mapping.getCardinality(rack, tag); + } finally { + readLock.unlock(); + } + } + + + + /** * Check if given tag exists on node. * * @param nodeId nodeId, required. * @param applicationId applicationId. When null is specified, return - * aggregated cardinality among all nodes. + * aggregation among all applications. * @param tag allocation tag, see * {@link SchedulingRequest#getAllocationTags()}, * When multiple tags specified. Returns cardinality @@ -387,7 +467,7 @@ public class AllocationTagsManager { * * @param nodeId nodeId, required. * @param applicationId applicationId. When null is specified, return - * aggregated cardinality among all nodes. + * aggregated cardinality among all applications. * @param tags allocation tags, see * {@link SchedulingRequest#getAllocationTags()}, * When multiple tags specified. Returns cardinality @@ -396,7 +476,7 @@ public class AllocationTagsManager { * specified, all tags (of the node/app) will be * considered. * @param op operator. Such as Long::max, Long::sum, etc. Required. - * This sparameter only take effect when #values >= 2. + * This parameter only take effect when #values >= 2. * @return cardinality of specified query on the node. * @throws InvalidAllocationTagsQueryException when illegal query * parameter specified @@ -414,9 +494,9 @@ public class AllocationTagsManager { NodeToCountedTags mapping; if (applicationId != null) { - mapping = perAppMappings.get(applicationId); - } else{ - mapping = globalMapping; + mapping = perAppNodeMappings.get(applicationId); + } else { + mapping = globalNodeMapping; } if (mapping == null) { @@ -428,4 +508,52 @@ public class AllocationTagsManager { readLock.unlock(); } } + + /** + * Get cardinality for following conditions. External can pass-in a binary op + * to implement customized logic. + * + * @param rack rack, required. + * @param applicationId applicationId. When null is specified, return + * aggregated cardinality among all applications. + * @param tags allocation tags, see + * {@link SchedulingRequest#getAllocationTags()}, + * When multiple tags specified. Returns cardinality + * depends on op. If a specified tag doesn't exist, 0 + * will be its cardinality. When null/empty tags + * specified, all tags (of the rack/app) will be + * considered. + * @param op operator. Such as Long::max, Long::sum, etc. Required. + * This parameter only take effect when #values >= 2. + * @return cardinality of specified query on the rack. + * @throws InvalidAllocationTagsQueryException when illegal query + * parameter specified + */ + public long getRackCardinalityByOp(String rack, ApplicationId applicationId, + Set tags, LongBinaryOperator op) + throws InvalidAllocationTagsQueryException { + readLock.lock(); + + try { + if (rack == null || op == null) { + throw new InvalidAllocationTagsQueryException( + "Must specify rack/tags/op to query cardinality"); + } + + NodeToCountedTags mapping; + if (applicationId != null) { + mapping = perAppRackMappings.get(applicationId); + } else { + mapping = globalRackMapping; + } + + if (mapping == null) { + return 0; + } + + return mapping.getCardinality(rack, tags, op); + } finally { + readLock.unlock(); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/37f1a7b6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java index 538d128..b927870 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java @@ -405,8 +405,8 @@ public class TestRMContainerImpl { RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class); - AllocationTagsManager tagsManager = new AllocationTagsManager(); RMContext rmContext = mock(RMContext.class); + AllocationTagsManager tagsManager = new AllocationTagsManager(rmContext); when(rmContext.getDispatcher()).thenReturn(drainDispatcher); when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); http://git-wip-us.apache.org/repos/asf/hadoop/blob/37f1a7b6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java index 4bb2a18..0ce1614 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java @@ -20,202 +20,300 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; -import com.google.common.collect.ImmutableSet; +import java.util.List; + import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import com.google.common.collect.ImmutableSet; + /** * Test functionality of AllocationTagsManager. */ public class TestAllocationTagsManager { + private RMContext rmContext; + + @Before + public void setup() { + MockRM rm = new MockRM(); + rm.start(); + MockNodes.resetHostIds(); + List rmNodes = + MockNodes.newNodes(2, 4, Resource.newInstance(4096, 4)); + for (RMNode rmNode : rmNodes) { + rm.getRMContext().getRMNodes().putIfAbsent(rmNode.getNodeID(), rmNode); + } + rmContext = rm.getRMContext(); + } + + @Test public void testAllocationTagsManagerSimpleCases() throws InvalidAllocationTagsQueryException { - AllocationTagsManager atm = new AllocationTagsManager(); + + AllocationTagsManager atm = new AllocationTagsManager(rmContext); /** * Construct test case: - * Node1: + * Node1 (rack0): * container_1_1 (mapper/reducer/app_1) * container_1_3 (service/app_1) * - * Node2: + * Node2 (rack0): * container_1_2 (mapper/reducer/app_1) * container_1_4 (reducer/app_1) * container_2_1 (service/app_2) */ // 3 Containers from app1 - atm.addContainer(NodeId.fromString("node1:1234"), + atm.addContainer(NodeId.fromString("host1:123"), TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1), ImmutableSet.of("mapper", "reducer")); - atm.addContainer(NodeId.fromString("node2:1234"), + atm.addContainer(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2), ImmutableSet.of("mapper", "reducer")); - atm.addContainer(NodeId.fromString("node1:1234"), + atm.addContainer(NodeId.fromString("host1:123"), TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3), ImmutableSet.of("service")); - atm.addContainer(NodeId.fromString("node2:1234"), + atm.addContainer(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4), ImmutableSet.of("reducer")); // 1 Container from app2 - atm.addContainer(NodeId.fromString("node2:1234"), + atm.addContainer(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3), ImmutableSet.of("service")); - // Get Cardinality of app1 on node1, with tag "mapper" + // Get Node Cardinality of app1 on node1, with tag "mapper" Assert.assertEquals(1, - atm.getNodeCardinalityByOp(NodeId.fromString("node1:1234"), + atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"), TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"), Long::max)); - // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=min + // Get Rack Cardinality of app1 on rack0, with tag "mapper" + Assert.assertEquals(2, atm.getRackCardinality("rack0", + TestUtils.getMockApplicationId(1), "mapper")); + + // Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=min Assert.assertEquals(1, - atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper", "reducer"), Long::min)); - // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=max + // Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=max Assert.assertEquals(2, - atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper", "reducer"), Long::max)); - // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=sum + // Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=sum Assert.assertEquals(3, - atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper", "reducer"), Long::sum)); - // Get Cardinality by passing single tag. + // Get Node Cardinality by passing single tag. Assert.assertEquals(1, - atm.getNodeCardinality(NodeId.fromString("node2:1234"), + atm.getNodeCardinality(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(1), "mapper")); Assert.assertEquals(2, - atm.getNodeCardinality(NodeId.fromString("node2:1234"), + atm.getNodeCardinality(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(1), "reducer")); - // Get Cardinality of app1 on node2, with tag "no_existed/reducer", op=min + // Get Node Cardinality of app1 on node2, with tag "no_existed/reducer", + // op=min Assert.assertEquals(0, - atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(1), ImmutableSet.of("no_existed", "reducer"), Long::min)); - // Get Cardinality of app1 on node2, with tag "", op=max + // Get Node Cardinality of app1 on node2, with tag "", op=max // (Expect this returns #containers from app1 on node2) + Assert + .assertEquals(2, + atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), + TestUtils.getMockApplicationId(1), + ImmutableSet.of(AllocationTagsNamespaces.APP_ID + + TestUtils.getMockApplicationId(1).toString()), + Long::max)); + + // Get Node Cardinality of app1 on node2, with empty tag set, op=max Assert.assertEquals(2, - atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), - TestUtils.getMockApplicationId(1), ImmutableSet - .of(AllocationTagsNamespaces.APP_ID + TestUtils - .getMockApplicationId(1).toString()), Long::max)); - - // Get Cardinality of app1 on node2, with empty tag set, op=max - Assert.assertEquals(2, - atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max)); - // Get Cardinality of all apps on node2, with empty tag set, op=sum - Assert.assertEquals(7, - atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), null, - ImmutableSet.of(), Long::sum)); + // Get Node Cardinality of all apps on node2, with empty tag set, op=sum + Assert.assertEquals(7, atm.getNodeCardinalityByOp( + NodeId.fromString("host2:123"), null, ImmutableSet.of(), Long::sum)); - // Get Cardinality of app_1 on node2, with empty tag set, op=sum + // Get Node Cardinality of app_1 on node2, with empty tag set, op=sum Assert.assertEquals(5, - atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::sum)); - // Get Cardinality of app_1 on node2, with empty tag set, op=sum + // Get Node Cardinality of app_1 on node2, with empty tag set, op=sum Assert.assertEquals(2, - atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(2), ImmutableSet.of(), Long::sum)); // Finish all containers: - atm.removeContainer(NodeId.fromString("node1:1234"), + atm.removeContainer(NodeId.fromString("host1:123"), TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1), ImmutableSet.of("mapper", "reducer")); - atm.removeContainer(NodeId.fromString("node2:1234"), + atm.removeContainer(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2), ImmutableSet.of("mapper", "reducer")); - atm.removeContainer(NodeId.fromString("node1:1234"), + atm.removeContainer(NodeId.fromString("host1:123"), TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3), ImmutableSet.of("service")); - atm.removeContainer(NodeId.fromString("node2:1234"), + atm.removeContainer(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4), ImmutableSet.of("reducer")); - atm.removeContainer(NodeId.fromString("node2:1234"), + atm.removeContainer(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3), ImmutableSet.of("service")); // Expect all cardinality to be 0 // Get Cardinality of app1 on node1, with tag "mapper" Assert.assertEquals(0, - atm.getNodeCardinalityByOp(NodeId.fromString("node1:1234"), + atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"), TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"), Long::max)); - // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=min + // Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=min Assert.assertEquals(0, - atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper", "reducer"), Long::min)); - // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=max + // Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=max Assert.assertEquals(0, - atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper", "reducer"), Long::max)); - // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=sum + // Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=sum Assert.assertEquals(0, - atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper", "reducer"), Long::sum)); - // Get Cardinality of app1 on node2, with tag "", op=max + // Get Node Cardinality of app1 on node2, with tag "", op=max // (Expect this returns #containers from app1 on node2) Assert.assertEquals(0, - atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(1), ImmutableSet.of(TestUtils.getMockApplicationId(1).toString()), Long::max)); Assert.assertEquals(0, - atm.getNodeCardinality(NodeId.fromString("node2:1234"), + atm.getNodeCardinality(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(1), TestUtils.getMockApplicationId(1).toString())); - // Get Cardinality of app1 on node2, with empty tag set, op=max + // Get Node Cardinality of app1 on node2, with empty tag set, op=max Assert.assertEquals(0, - atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max)); - // Get Cardinality of all apps on node2, with empty tag set, op=sum - Assert.assertEquals(0, - atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), null, - ImmutableSet.of(), Long::sum)); + // Get Node Cardinality of all apps on node2, with empty tag set, op=sum + Assert.assertEquals(0, atm.getNodeCardinalityByOp( + NodeId.fromString("host2:123"), null, ImmutableSet.of(), Long::sum)); - // Get Cardinality of app_1 on node2, with empty tag set, op=sum + // Get Node Cardinality of app_1 on node2, with empty tag set, op=sum Assert.assertEquals(0, - atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::sum)); - // Get Cardinality of app_1 on node2, with empty tag set, op=sum + // Get Node Cardinality of app_2 on node2, with empty tag set, op=sum Assert.assertEquals(0, - atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(2), ImmutableSet.of(), Long::sum)); } + + @Test + public void testAllocationTagsManagerRackMapping() + throws InvalidAllocationTagsQueryException { + + AllocationTagsManager atm = new AllocationTagsManager(rmContext); + + /** + * Construct Rack test case: + * Node1 (rack0): + * container_1_1 (mapper/reducer/app_1) + * container_1_4 (reducer/app_2) + * + * Node2 (rack0): + * container_1_2 (mapper/reducer/app_2) + * container_1_3 (service/app_1) + * + * Node5 (rack1): + * container_2_1 (service/app_2) + */ + + // 3 Containers from app1 + atm.addContainer(NodeId.fromString("host1:123"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1), + ImmutableSet.of("mapper", "reducer")); + + atm.addContainer(NodeId.fromString("host2:123"), + TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 2), + ImmutableSet.of("mapper", "reducer")); + + atm.addContainer(NodeId.fromString("host1:123"), + TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 4), + ImmutableSet.of("reducer")); + + atm.addContainer(NodeId.fromString("host2:123"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3), + ImmutableSet.of("service")); + + // 1 Container from app2 + atm.addContainer(NodeId.fromString("host2:123"), + TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3), + ImmutableSet.of("service")); + + // Get Rack Cardinality of app1 on rack0, with tag "mapper" + Assert.assertEquals(1, atm.getRackCardinality("rack0", + TestUtils.getMockApplicationId(1), "mapper")); + + // Get Rack Cardinality of app2 on rack0, with tag "reducer" + Assert.assertEquals(2, atm.getRackCardinality("rack0", + TestUtils.getMockApplicationId(2), "reducer")); + + // Get Rack Cardinality of all apps on rack0, with tag "reducer" + Assert.assertEquals(3, atm.getRackCardinality("rack0", null, "reducer")); + + // Get Rack Cardinality of app_1 on rack0, with empty tag set, op=max + Assert.assertEquals(2, atm.getRackCardinalityByOp("rack0", + TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max)); + + // Get Rack Cardinality of app_1 on rack0, with empty tag set, op=min + Assert.assertEquals(1, atm.getRackCardinalityByOp("rack0", + TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::min)); + + // Get Rack Cardinality of all apps on rack0, with empty tag set, op=min + Assert.assertEquals(3, atm.getRackCardinalityByOp("rack0", null, + ImmutableSet.of(), Long::max)); + } + @Test public void testAllocationTagsManagerMemoryAfterCleanup() throws InvalidAllocationTagsQueryException { @@ -223,54 +321,57 @@ public class TestAllocationTagsManager { * Make sure YARN cleans up all memory once container/app finishes. */ - AllocationTagsManager atm = new AllocationTagsManager(); + AllocationTagsManager atm = new AllocationTagsManager(rmContext); // Add a bunch of containers - atm.addContainer(NodeId.fromString("node1:1234"), + atm.addContainer(NodeId.fromString("host1:123"), TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1), ImmutableSet.of("mapper", "reducer")); - atm.addContainer(NodeId.fromString("node2:1234"), + atm.addContainer(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2), ImmutableSet.of("mapper", "reducer")); - atm.addContainer(NodeId.fromString("node1:1234"), + atm.addContainer(NodeId.fromString("host1:123"), TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3), ImmutableSet.of("service")); - atm.addContainer(NodeId.fromString("node2:1234"), + atm.addContainer(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4), ImmutableSet.of("reducer")); - atm.addContainer(NodeId.fromString("node2:1234"), + atm.addContainer(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3), ImmutableSet.of("service")); // Remove all these containers - atm.removeContainer(NodeId.fromString("node1:1234"), + atm.removeContainer(NodeId.fromString("host1:123"), TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1), ImmutableSet.of("mapper", "reducer")); - atm.removeContainer(NodeId.fromString("node2:1234"), + atm.removeContainer(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2), ImmutableSet.of("mapper", "reducer")); - atm.removeContainer(NodeId.fromString("node1:1234"), + atm.removeContainer(NodeId.fromString("host1:123"), TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3), ImmutableSet.of("service")); - atm.removeContainer(NodeId.fromString("node2:1234"), + atm.removeContainer(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4), ImmutableSet.of("reducer")); - atm.removeContainer(NodeId.fromString("node2:1234"), + atm.removeContainer(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3), ImmutableSet.of("service")); // Check internal data structure Assert.assertEquals(0, - atm.getGlobalMapping().getNodeToTagsWithCount().size()); - Assert.assertEquals(0, atm.getPerAppMappings().size()); + atm.getGlobalNodeMapping().getTypeToTagsWithCount().size()); + Assert.assertEquals(0, atm.getPerAppNodeMappings().size()); + Assert.assertEquals(0, + atm.getGlobalRackMapping().getTypeToTagsWithCount().size()); + Assert.assertEquals(0, atm.getPerAppRackMappings().size()); } @Test @@ -280,26 +381,26 @@ public class TestAllocationTagsManager { * Make sure YARN cleans up all memory once container/app finishes. */ - AllocationTagsManager atm = new AllocationTagsManager(); + AllocationTagsManager atm = new AllocationTagsManager(rmContext); // Add a bunch of containers - atm.addContainer(NodeId.fromString("node1:1234"), + atm.addContainer(NodeId.fromString("host1:123"), TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1), ImmutableSet.of("mapper", "reducer")); - atm.addContainer(NodeId.fromString("node2:1234"), + atm.addContainer(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2), ImmutableSet.of("mapper", "reducer")); - atm.addContainer(NodeId.fromString("node1:1234"), + atm.addContainer(NodeId.fromString("host1:123"), TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3), ImmutableSet.of("service")); - atm.addContainer(NodeId.fromString("node2:1234"), + atm.addContainer(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4), ImmutableSet.of("reducer")); - atm.addContainer(NodeId.fromString("node2:1234"), + atm.addContainer(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3), ImmutableSet.of("service")); @@ -317,7 +418,7 @@ public class TestAllocationTagsManager { // No op caughtException = false; try { - atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(2), ImmutableSet.of("mapper"), null); } catch (InvalidAllocationTagsQueryException e) { caughtException = true; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org