Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A49E9200D49 for ; Fri, 20 Oct 2017 00:40:46 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A34361609EE; Thu, 19 Oct 2017 22:40:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9A975160BEE for ; Fri, 20 Oct 2017 00:40:45 +0200 (CEST) Received: (qmail 40201 invoked by uid 500); 19 Oct 2017 22:40:41 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 39010 invoked by uid 99); 19 Oct 2017 22:40:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Oct 2017 22:40:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 01513E0007; Thu, 19 Oct 2017 22:40:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: templedf@apache.org To: common-commits@hadoop.apache.org Date: Thu, 19 Oct 2017 22:40:53 -0000 Message-Id: <9f4f8cee2bf54d5996ae59ee76e3e7ea@git.apache.org> In-Reply-To: <6a8df4a7ec5a483495c54f5e5661004d@git.apache.org> References: <6a8df4a7ec5a483495c54f5e5661004d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [15/50] [abbrv] hadoop git commit: YARN-4172. Extend DominantResourceCalculator to account for all resources. (Varun Vasudev via wangda) archived-at: Thu, 19 Oct 2017 22:40:46 -0000 YARN-4172. Extend DominantResourceCalculator to account for all resources. (Varun Vasudev via wangda) (cherry picked from commit 32c91223f1bd06561ea4ce2d1944e8d9a847f18c) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8522a568 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8522a568 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8522a568 Branch: refs/heads/resource-types Commit: 8522a5689b5ac78593bba4ace148d47bf4d25d4c Parents: d1c4565 Author: Wangda Tan Authored: Fri Jan 29 10:53:31 2016 +0800 Committer: Daniel Templeton Committed: Thu Oct 19 15:39:58 2017 -0700 ---------------------------------------------------------------------- .../resource/DominantResourceCalculator.java | 380 +++++++++++++------ 1 file changed, 273 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8522a568/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java index 7697e1d..a94e7a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java @@ -22,25 +22,31 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.UnitsConversionUtil; + +import java.util.HashSet; +import java.util.Set; /** - * A {@link ResourceCalculator} which uses the concept of + * A {@link ResourceCalculator} which uses the concept of * dominant resource to compare multi-dimensional resources. * - * Essentially the idea is that the in a multi-resource environment, - * the resource allocation should be determined by the dominant share - * of an entity (user or queue), which is the maximum share that the - * entity has been allocated of any resource. - * - * In a nutshell, it seeks to maximize the minimum dominant share across - * all entities. - * + * Essentially the idea is that the in a multi-resource environment, + * the resource allocation should be determined by the dominant share + * of an entity (user or queue), which is the maximum share that the + * entity has been allocated of any resource. + * + * In a nutshell, it seeks to maximize the minimum dominant share across + * all entities. + * * For example, if user A runs CPU-heavy tasks and user B runs - * memory-heavy tasks, it attempts to equalize CPU share of user A - * with Memory-share of user B. - * + * memory-heavy tasks, it attempts to equalize CPU share of user A + * with Memory-share of user B. + * * In the single resource case, it reduces to max-min fairness for that resource. - * + * * See the Dominant Resource Fairness paper for more details: * www.cs.berkeley.edu/~matei/papers/2011/nsdi_drf.pdf */ @@ -50,6 +56,56 @@ public class DominantResourceCalculator extends ResourceCalculator { private static final Log LOG = LogFactory.getLog(DominantResourceCalculator.class); + + private Set resourceNames; + + public DominantResourceCalculator() { + resourceNames = new HashSet<>(); + resourceNames.add(ResourceInformation.MEMORY.getName()); + resourceNames.add(ResourceInformation.VCORES.getName()); + } + + /** + * Compare two resources - if the value for every resource type for the lhs + * is greater than that of the rhs, return 1. If the value for every resource + * type in the lhs is less than the rhs, return -1. Otherwise, return 0 + * + * @param lhs resource to be compared + * @param rhs resource to be compared + * @return 0, 1, or -1 + */ + private int compare(Resource lhs, Resource rhs) { + boolean lhsGreater = false; + boolean rhsGreater = false; + int ret = 0; + + for (String rName : resourceNames) { + try { + ResourceInformation lhsResourceInformation = + lhs.getResourceInformation(rName); + ResourceInformation rhsResourceInformation = + rhs.getResourceInformation(rName); + int diff = lhsResourceInformation.compareTo(rhsResourceInformation); + if (diff >= 1) { + lhsGreater = true; + } else if (diff <= -1) { + rhsGreater = true; + } + } catch (YarnException ye) { + throw new IllegalArgumentException( + "Error getting resource information for " + rName, ye); + } + } + if (lhsGreater && rhsGreater) { + ret = 0; + } else if (lhsGreater) { + ret = 1; + } else if (rhsGreater) { + ret = -1; + } + return ret; + } + @Override public int compare(Resource clusterResource, Resource lhs, Resource rhs, boolean singleType) { @@ -57,25 +113,14 @@ public class DominantResourceCalculator extends ResourceCalculator { if (lhs.equals(rhs)) { return 0; } - + if (isInvalidDivisor(clusterResource)) { - if ((lhs.getMemorySize() < rhs.getMemorySize() && - lhs.getVirtualCores() > rhs.getVirtualCores()) || - (lhs.getMemorySize() > rhs.getMemorySize() && - lhs.getVirtualCores() < rhs.getVirtualCores())) { - return 0; - } else if (lhs.getMemorySize() > rhs.getMemorySize() - || lhs.getVirtualCores() > rhs.getVirtualCores()) { - return 1; - } else if (lhs.getMemorySize() < rhs.getMemorySize() - || lhs.getVirtualCores() < rhs.getVirtualCores()) { - return -1; - } + return this.compare(lhs, rhs); } float l = getResourceAsValue(clusterResource, lhs, true); float r = getResourceAsValue(clusterResource, rhs, true); - + if (l < r) { return -1; } else if (l > r) { @@ -83,75 +128,142 @@ public class DominantResourceCalculator extends ResourceCalculator { } else if (!singleType) { l = getResourceAsValue(clusterResource, lhs, false); r = getResourceAsValue(clusterResource, rhs, false); + if (l < r) { return -1; } else if (l > r) { return 1; } } - + return 0; } /** * Use 'dominant' for now since we only have 2 resources - gives us a slight * performance boost. - * + *

* Once we add more resources, we'll need a more complicated (and slightly * less performant algorithm). */ - protected float getResourceAsValue( - Resource clusterResource, Resource resource, boolean dominant) { - // Just use 'dominant' resource - return (dominant) ? - Math.max( - (float)resource.getMemorySize() / clusterResource.getMemorySize(), - (float)resource.getVirtualCores() / clusterResource.getVirtualCores() - ) - : - Math.min( - (float)resource.getMemorySize() / clusterResource.getMemorySize(), - (float)resource.getVirtualCores() / clusterResource.getVirtualCores() - ); - } - + protected float getResourceAsValue(Resource clusterResource, + Resource resource, boolean dominant) { + + float min = Float.MAX_VALUE; + float max = 0.0f; + for (String rName : resourceNames) { + try { + ResourceInformation clusterResourceResourceInformation = + clusterResource.getResourceInformation(rName); + ResourceInformation resourceInformation = + resource.getResourceInformation(rName); + Long resourceValue = UnitsConversionUtil + .convert(resourceInformation.getUnits(), + clusterResourceResourceInformation.getUnits(), + resourceInformation.getValue()); + float tmp = + (float) resourceValue / (float) clusterResourceResourceInformation + .getValue(); + min = min < tmp ? min : tmp; + max = max > tmp ? max : tmp; + } catch (YarnException ye) { + throw new IllegalArgumentException( + "Error getting resource information for " + resource, ye); + } + } + return (dominant) ? max : min; + } + @Override public long computeAvailableContainers(Resource available, Resource required) { - return Math.min( - available.getMemorySize() / required.getMemorySize(), - available.getVirtualCores() / required.getVirtualCores()); + long min = Long.MAX_VALUE; + for (String resource : resourceNames) { + try { + ResourceInformation availableResource = + available.getResourceInformation(resource); + ResourceInformation requiredResource = + required.getResourceInformation(resource); + Long requiredResourceValue = UnitsConversionUtil + .convert(requiredResource.getUnits(), availableResource.getUnits(), + requiredResource.getValue()); + Long tmp = availableResource.getValue() / requiredResourceValue; + min = min < tmp ? min : tmp; + } catch (YarnException ye) { + throw new IllegalArgumentException( + "Error getting resource information for " + resource, ye); + } + + } + return min > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) min; } @Override - public float divide(Resource clusterResource, + public float divide(Resource clusterResource, Resource numerator, Resource denominator) { - return - getResourceAsValue(clusterResource, numerator, true) / + return + getResourceAsValue(clusterResource, numerator, true) / getResourceAsValue(clusterResource, denominator, true); } - + @Override public boolean isInvalidDivisor(Resource r) { - if (r.getMemorySize() == 0.0f || r.getVirtualCores() == 0.0f) { - return true; + for (String resource : resourceNames) { + try { + if (r.getResourceValue(resource).equals(0L)) { + return true; + } + } catch (YarnException ye) { + throw new IllegalArgumentException( + "Error getting resource value for " + resource, ye); + } } return false; } @Override public float ratio(Resource a, Resource b) { - return Math.max( - (float)a.getMemorySize()/b.getMemorySize(), - (float)a.getVirtualCores()/b.getVirtualCores() - ); + float ratio = 0.0f; + for (String resource : resourceNames) { + try { + ResourceInformation aResourceInformation = + a.getResourceInformation(resource); + ResourceInformation bResourceInformation = + b.getResourceInformation(resource); + Long bResourceValue = UnitsConversionUtil + .convert(bResourceInformation.getUnits(), + aResourceInformation.getUnits(), + bResourceInformation.getValue()); + float tmp = + (float) aResourceInformation.getValue() / (float) bResourceValue; + ratio = ratio > tmp ? ratio : tmp; + } catch (YarnException ye) { + throw new IllegalArgumentException( + "Error getting resource information for " + resource, ye); + } + } + return ratio; } @Override public Resource divideAndCeil(Resource numerator, int denominator) { - return Resources.createResource( - divideAndCeil(numerator.getMemorySize(), denominator), - divideAndCeil(numerator.getVirtualCores(), denominator) - ); + return divideAndCeil(numerator, (long) denominator); + } + + public Resource divideAndCeil(Resource numerator, long denominator) { + Resource ret = Resources.createResource(0, 0); + for (String resource : resourceNames) { + try { + ResourceInformation resourceInformation = ResourceInformation + .newInstance(numerator.getResourceInformation(resource)); + resourceInformation.setValue( + divideAndCeil(resourceInformation.getValue(), denominator)); + ret.setResourceInformation(resource, resourceInformation); + } catch (YarnException ye) { + throw new IllegalArgumentException( + "Error getting resource information for " + resource, ye); + } + } + return ret; } @Override @@ -164,73 +276,127 @@ public class DominantResourceCalculator extends ResourceCalculator { @Override public Resource normalize(Resource r, Resource minimumResource, - Resource maximumResource, Resource stepFactor) { - if (stepFactor.getMemorySize() == 0 || stepFactor.getVirtualCores() == 0) { - Resource step = Resources.clone(stepFactor); - if (stepFactor.getMemorySize() == 0) { - LOG.error("Memory cannot be allocated in increments of zero. Assuming " - + minimumResource.getMemorySize() + "MB increment size. " - + "Please ensure the scheduler configuration is correct."); - step.setMemorySize(minimumResource.getMemorySize()); - } + Resource maximumResource, Resource stepFactor) { + Resource ret = Resources.createResource(0, 0); + for (String resource : resourceNames) { + try { + ResourceInformation rResourceInformation = + r.getResourceInformation(resource); + ResourceInformation minimumResourceInformation = + minimumResource.getResourceInformation(resource); + ResourceInformation maximumResourceInformation = + maximumResource.getResourceInformation(resource); + ResourceInformation stepFactorResourceInformation = + stepFactor.getResourceInformation(resource); + ResourceInformation tmp = + ResourceInformation.newInstance(rResourceInformation); - if (stepFactor.getVirtualCores() == 0) { - LOG.error("VCore cannot be allocated in increments of zero. Assuming " - + minimumResource.getVirtualCores() + "VCores increment size. " - + "Please ensure the scheduler configuration is correct."); - step.setVirtualCores(minimumResource.getVirtualCores()); - } + Long rValue = rResourceInformation.getValue(); + Long minimumValue = UnitsConversionUtil + .convert(minimumResourceInformation.getUnits(), + rResourceInformation.getUnits(), + minimumResourceInformation.getValue()); + Long maximumValue = UnitsConversionUtil + .convert(maximumResourceInformation.getUnits(), + rResourceInformation.getUnits(), + maximumResourceInformation.getValue()); + Long stepFactorValue = UnitsConversionUtil + .convert(stepFactorResourceInformation.getUnits(), + rResourceInformation.getUnits(), + stepFactorResourceInformation.getValue()); - stepFactor = step; + tmp.setValue( + Math.min(roundUp(Math.max(rValue, minimumValue), stepFactorValue), + maximumValue)); + ret.setResourceInformation(resource, tmp); + } catch (YarnException ye) { + throw new IllegalArgumentException( + "Error getting resource information for " + resource, ye); + } } - - long normalizedMemory = Math.min( - roundUp( - Math.max(r.getMemorySize(), minimumResource.getMemorySize()), - stepFactor.getMemorySize()), - maximumResource.getMemorySize()); - int normalizedCores = Math.min( - roundUp( - Math.max(r.getVirtualCores(), minimumResource.getVirtualCores()), - stepFactor.getVirtualCores()), - maximumResource.getVirtualCores()); - return Resources.createResource(normalizedMemory, - normalizedCores); + return ret; } @Override public Resource roundUp(Resource r, Resource stepFactor) { - return Resources.createResource( - roundUp(r.getMemorySize(), stepFactor.getMemorySize()), - roundUp(r.getVirtualCores(), stepFactor.getVirtualCores()) - ); + return this.rounding(r, stepFactor, true); } @Override public Resource roundDown(Resource r, Resource stepFactor) { - return Resources.createResource( - roundDown(r.getMemorySize(), stepFactor.getMemorySize()), - roundDown(r.getVirtualCores(), stepFactor.getVirtualCores()) - ); + return this.rounding(r, stepFactor, false); + } + + private Resource rounding(Resource r, Resource stepFactor, boolean roundUp) { + Resource ret = Resources.createResource(0, 0); + for (String resource : resourceNames) { + try { + ResourceInformation rResourceInformation = + r.getResourceInformation(resource); + ResourceInformation stepFactorResourceInformation = + stepFactor.getResourceInformation(resource); + ResourceInformation tmp = + ResourceInformation.newInstance(rResourceInformation); + + Long rValue = rResourceInformation.getValue(); + Long stepFactorValue = UnitsConversionUtil + .convert(stepFactorResourceInformation.getUnits(), + rResourceInformation.getUnits(), + stepFactorResourceInformation.getValue()); + + Long value = roundUp ? roundUp(rValue, stepFactorValue) : + roundDown(rValue, stepFactorValue); + tmp.setValue(value); + ret.setResourceInformation(resource, tmp); + } catch (YarnException ye) { + throw new IllegalArgumentException( + "Error getting resource information for " + resource, ye); + } + } + return ret; } @Override public Resource multiplyAndNormalizeUp(Resource r, double by, Resource stepFactor) { - return Resources.createResource( - roundUp((long) Math.ceil((float) (r.getMemorySize() * by)), - stepFactor.getMemorySize()), - roundUp((int) Math.ceil((float) (r.getVirtualCores() * by)), - stepFactor.getVirtualCores())); + return this.multiplyAndNormalize(r, by, stepFactor, true); } @Override public Resource multiplyAndNormalizeDown(Resource r, double by, Resource stepFactor) { - return Resources.createResource( - roundDown((long) (r.getMemorySize() * by), stepFactor.getMemorySize()), - roundDown((int) (r.getVirtualCores() * by), - stepFactor.getVirtualCores())); + return this.multiplyAndNormalize(r, by, stepFactor, false); + } + + private Resource multiplyAndNormalize(Resource r, double by, + Resource stepFactor, boolean roundUp) { + Resource ret = Resources.createResource(0, 0); + for (String resource : resourceNames) { + try { + ResourceInformation rResourceInformation = + r.getResourceInformation(resource); + ResourceInformation stepFactorResourceInformation = + stepFactor.getResourceInformation(resource); + ResourceInformation tmp = + ResourceInformation.newInstance(rResourceInformation); + + Long rValue = rResourceInformation.getValue(); + Long stepFactorValue = UnitsConversionUtil + .convert(stepFactorResourceInformation.getUnits(), + rResourceInformation.getUnits(), + stepFactorResourceInformation.getValue()); + + Long value = + roundUp ? roundUp((long) Math.ceil(rValue * by), stepFactorValue) : + roundDown((long) (rValue * by), stepFactorValue); + tmp.setValue(value); + ret.setResourceInformation(resource, tmp); + } catch (YarnException ye) { + throw new IllegalArgumentException( + "Error getting resource information for " + resource, ye); + } + } + return ret; } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org