hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject [2/2] hadoop git commit: YARN-5164. Use plan RLE to improve CapacityOverTimePolicy efficiency
Date Mon, 25 Jul 2016 23:38:54 GMT
YARN-5164. Use plan RLE to improve CapacityOverTimePolicy efficiency


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8e2f75f8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8e2f75f8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8e2f75f8

Branch: refs/heads/branch-3.0.0-alpha1
Commit: 8e2f75f81193d9d12b8f0632d556bdc677b2d198
Parents: eecba92
Author: Chris Douglas <cdouglas@apache.org>
Authored: Mon Jul 25 16:37:50 2016 -0700
Committer: Chris Douglas <cdouglas@apache.org>
Committed: Mon Jul 25 16:38:09 2016 -0700

----------------------------------------------------------------------
 .../reservation/CapacityOverTimePolicy.java     | 299 +++++++++++--------
 .../reservation/TestCapacityOverTimePolicy.java |  20 +-
 2 files changed, 185 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e2f75f8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
index 07bee99..bb1a4e8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
@@ -1,26 +1,21 @@
 /*******************************************************************************
- *   Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
  *
- *       http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
  *******************************************************************************/
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
-import java.util.Date;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ReservationId;
@@ -28,9 +23,12 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
 /**
  * This policy enforces a time-extended notion of Capacity. In particular it
  * guarantees that the allocation received in input when combined with all
@@ -39,11 +37,11 @@ import org.apache.hadoop.yarn.util.resource.Resources;
  * validWindow, the integral of the allocations for a user (sum of the currently
  * submitted allocation and all prior allocations for the user) does not exceed
  * validWindow * maxAvg.
- * 
+ *
  * This allows flexibility, in the sense that an allocation can instantaneously
  * use large portions of the available capacity, but prevents abuses by bounding
  * the average use over time.
- * 
+ *
  * By controlling maxInst, maxAvg, validWindow the administrator configuring
  * this policy can obtain a behavior ranging from instantaneously enforced
  * capacity (akin to existing queues), or fully flexible allocations (likely
@@ -51,7 +49,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
  */
 @LimitedPrivate("yarn")
 @Unstable
-public class CapacityOverTimePolicy implements SharingPolicy {
+public class CapacityOverTimePolicy extends NoOverCommitPolicy {
 
   private ReservationSchedulerConfiguration conf;
   private long validWindow;
@@ -68,123 +66,155 @@ public class CapacityOverTimePolicy implements SharingPolicy {
     validWindow = this.conf.getReservationWindow(reservationQueuePath);
     maxInst = this.conf.getInstantaneousMaxCapacity(reservationQueuePath) / 100;
     maxAvg = this.conf.getAverageCapacity(reservationQueuePath) / 100;
-  };
+  }
 
+  /**
+   * The validation algorithm walks over the RLE encoded allocation and
+   * checks that for all transition points (when the start or end of the
+   * checking window encounters a value in the RLE). At this point it
+   * checkes whether the integral computed exceeds the quota limit. Note that
+   * this might not find the exact time of a violation, but if a violation
+   * exists it will find it. The advantage is a much lower number of checks
+   * as compared to time-slot by time-slot checks.
+   *
+   * @param plan the plan to validate against
+   * @param reservation the reservation allocation to test.
+   * @throws PlanningException if the validation fails.
+   */
   @Override
   public void validate(Plan plan, ReservationAllocation reservation)
       throws PlanningException {
 
-    // this is entire method invoked under a write-lock on the plan, no need
-    // to synchronize accesses to the plan further
 
-    // Try to verify whether there is already a reservation with this ID in
-    // the system (remove its contribution during validation to simulate a
-    // try-n-swap
-    // update).
-    ReservationAllocation oldReservation =
+    // rely on NoOverCommitPolicy to check for: 1) user-match, 2) physical
+    // cluster limits, and 3) maxInst (via override of available)
+    try {
+      super.validate(plan, reservation);
+    } catch (PlanningException p) {
+      //wrap it in proper quota exception
+      throw new PlanningQuotaException(p);
+    }
+
+    //---- check for integral violations of capacity --------
+
+    // Gather a view of what to check (curr allocation of user, minus old
+    // version of this reservation, plus new version)
+    RLESparseResourceAllocation consumptionForUserOverTime =
+        plan.getConsumptionForUserOverTime(reservation.getUser(),
+            reservation.getStartTime() - validWindow,
+            reservation.getEndTime() + validWindow);
+
+    ReservationAllocation old =
         plan.getReservationById(reservation.getReservationId());
+    if (old != null) {
+      consumptionForUserOverTime = RLESparseResourceAllocation
+          .merge(plan.getResourceCalculator(), plan.getTotalCapacity(),
+              consumptionForUserOverTime, old.getResourcesOverTime(),
+              RLEOperator.add, reservation.getStartTime() - validWindow,
+              reservation.getEndTime() + validWindow);
+    }
 
-    long startTime = reservation.getStartTime();
-    long endTime = reservation.getEndTime();
-    long step = plan.getStep();
+    RLESparseResourceAllocation resRLE = reservation.getResourcesOverTime();
 
-    Resource planTotalCapacity = plan.getTotalCapacity();
+    RLESparseResourceAllocation toCheck = RLESparseResourceAllocation
+        .merge(plan.getResourceCalculator(), plan.getTotalCapacity(),
+            consumptionForUserOverTime, resRLE, RLEOperator.add, Long.MIN_VALUE,
+            Long.MAX_VALUE);
 
-    Resource maxAvgRes = Resources.multiply(planTotalCapacity, maxAvg);
-    Resource maxInsRes = Resources.multiply(planTotalCapacity, maxInst);
+    NavigableMap<Long, Resource> integralUp = new TreeMap<>();
+    NavigableMap<Long, Resource> integralDown = new TreeMap<>();
 
-    // define variable that will store integral of resources (need diff class to
-    // avoid overflow issues for long/large allocations)
+    long prevTime = toCheck.getEarliestStartTime();
+    IntegralResource prevResource = new IntegralResource(0L, 0L);
     IntegralResource runningTot = new IntegralResource(0L, 0L);
-    IntegralResource maxAllowed = new IntegralResource(maxAvgRes);
-    maxAllowed.multiplyBy(validWindow / step);
-
-    RLESparseResourceAllocation userCons =
-        plan.getConsumptionForUserOverTime(reservation.getUser(), startTime
-            - validWindow, endTime + validWindow);
-
-    // check that the resources offered to the user during any window of length
-    // "validWindow" overlapping this allocation are within maxAllowed
-    // also enforce instantaneous and physical constraints during this pass
-    for (long t = startTime - validWindow; t < endTime + validWindow; t += step) {
-
-      Resource currExistingAllocTot = plan.getTotalCommittedResources(t);
-      Resource currExistingAllocForUser = userCons.getCapacityAtTime(t);
-      Resource currNewAlloc = reservation.getResourcesAtTime(t);
-      Resource currOldAlloc = Resources.none();
-      if (oldReservation != null) {
-        currOldAlloc = oldReservation.getResourcesAtTime(t);
-      }
 
-      // throw exception if the cluster is overcommitted
-      // tot_allocated - old + new > capacity
-      Resource inst =
-          Resources.subtract(Resources.add(currExistingAllocTot, currNewAlloc),
-              currOldAlloc);
-      if (Resources.greaterThan(plan.getResourceCalculator(),
-          planTotalCapacity, inst, planTotalCapacity)) {
-        throw new ResourceOverCommitException(" Resources at time " + t
-            + " would be overcommitted (" + inst + " over "
-            + plan.getTotalCapacity() + ") by accepting reservation: "
-            + reservation.getReservationId());
-      }
+    // add intermediate points
+    Map<Long, Resource> temp = new TreeMap<>();
+    for (Map.Entry<Long, Resource> pointToCheck : toCheck.getCumulative()
+        .entrySet()) {
 
-      // throw exception if instantaneous limits are violated
-      // tot_alloc_to_this_user - old + new > inst_limit
-      if (Resources.greaterThan(plan.getResourceCalculator(),
-          planTotalCapacity, Resources.subtract(
-              Resources.add(currExistingAllocForUser, currNewAlloc),
-              currOldAlloc), maxInsRes)) {
-        throw new PlanningQuotaException("Instantaneous quota capacity "
-            + maxInst + " would be passed at time " + t
-            + " by accepting reservation: " + reservation.getReservationId());
-      }
+      Long timeToCheck = pointToCheck.getKey();
+      Resource resourceToCheck = pointToCheck.getValue();
 
-      // throw exception if the running integral of utilization over validWindow
-      // is violated. We perform a delta check, adding/removing instants at the
-      // boundary of the window from runningTot.
-
-      // runningTot = previous_runningTot + currExistingAllocForUser +
-      // currNewAlloc - currOldAlloc - pastNewAlloc - pastOldAlloc;
-
-      // Where:
-      // 1) currNewAlloc, currExistingAllocForUser represent the contribution of
-      // the instant in time added in this pass.
-      // 2) pastNewAlloc, pastOldAlloc are the contributions relative to time
-      // instants that are being retired from the the window
-      // 3) currOldAlloc is the contribution (if any) of the previous version of
-      // this reservation (the one we are updating)
-
-      runningTot.add(currExistingAllocForUser);
-      runningTot.add(currNewAlloc);
-      runningTot.subtract(currOldAlloc);
-
-      // expire contributions from instant in time before (t - validWindow)
-      if (t > startTime) {
-        Resource pastOldAlloc = userCons.getCapacityAtTime(t - validWindow);
-        Resource pastNewAlloc = reservation.getResourcesAtTime(t - validWindow);
-
-        // runningTot = runningTot - pastExistingAlloc - pastNewAlloc;
-        runningTot.subtract(pastOldAlloc);
-        runningTot.subtract(pastNewAlloc);
+      Long nextPoint = toCheck.getCumulative().higherKey(timeToCheck);
+      if (nextPoint == null || toCheck.getCumulative().get(nextPoint) == null) {
+        continue;
       }
-
-      // check integral
-      // runningTot > maxAvg * validWindow
-      // NOTE: we need to use comparator of IntegralResource directly, as
-      // Resource and ResourceCalculator assume "int" amount of resources,
-      // which is not sufficient when comparing integrals (out-of-bound)
-      if (maxAllowed.compareTo(runningTot) < 0) {
-        throw new PlanningQuotaException(
-            "Integral (avg over time) quota capacity " + maxAvg
-                + " over a window of " + validWindow / 1000 + " seconds, "
-                + " would be passed at time " + t + "(" + new Date(t)
-                + ") by accepting reservation: "
-                + reservation.getReservationId());
+      for (int i = 1; i <= (nextPoint - timeToCheck) / validWindow; i++) {
+        temp.put(timeToCheck + (i * validWindow), resourceToCheck);
+      }
+    }
+    temp.putAll(toCheck.getCumulative());
+
+    // compute point-wise integral for the up-fronts and down-fronts
+    for (Map.Entry<Long, Resource> currPoint : temp.entrySet()) {
+
+      Long currTime = currPoint.getKey();
+      Resource currResource = currPoint.getValue();
+
+      //add to running total current contribution
+      prevResource.multiplyBy(currTime - prevTime);
+      runningTot.add(prevResource);
+      integralUp.put(currTime, normalizeToResource(runningTot, validWindow));
+      integralDown.put(currTime + validWindow,
+          normalizeToResource(runningTot, validWindow));
+
+      if (currResource != null) {
+        prevResource.memory = currResource.getMemorySize();
+        prevResource.vcores = currResource.getVirtualCores();
+      } else {
+        prevResource.memory = 0L;
+        prevResource.vcores = 0L;
       }
+      prevTime = currTime;
+    }
+
+    // compute final integral as delta of up minus down transitions
+    RLESparseResourceAllocation intUp =
+        new RLESparseResourceAllocation(integralUp,
+            plan.getResourceCalculator());
+    RLESparseResourceAllocation intDown =
+        new RLESparseResourceAllocation(integralDown,
+            plan.getResourceCalculator());
+
+    RLESparseResourceAllocation integral = RLESparseResourceAllocation
+        .merge(plan.getResourceCalculator(), plan.getTotalCapacity(), intUp,
+            intDown, RLEOperator.subtract, Long.MIN_VALUE, Long.MAX_VALUE);
+
+    // define over-time integral limit
+    // note: this is aligned with the normalization done above
+    NavigableMap<Long, Resource> tlimit = new TreeMap<>();
+    Resource maxAvgRes = Resources.multiply(plan.getTotalCapacity(), maxAvg);
+    tlimit.put(toCheck.getEarliestStartTime() - validWindow, maxAvgRes);
+    RLESparseResourceAllocation targetLimit =
+        new RLESparseResourceAllocation(tlimit, plan.getResourceCalculator());
+
+    // compare using merge() limit with integral
+    try {
+      RLESparseResourceAllocation
+          .merge(plan.getResourceCalculator(), plan.getTotalCapacity(),
+              targetLimit, integral, RLEOperator.subtractTestNonNegative,
+              reservation.getStartTime() - validWindow,
+              reservation.getEndTime() + validWindow);
+    } catch (PlanningException p) {
+      throw new PlanningQuotaException(
+          "Integral (avg over time) quota capacity " + maxAvg
+              + " over a window of " + validWindow / 1000 + " seconds, "
+              + " would be exceeded by accepting reservation: " + reservation
+              .getReservationId(), p);
     }
   }
 
+  private Resource normalizeToResource(IntegralResource runningTot,
+      long window) {
+    // normalize to fit in windows. Rounding should not impact more than
+    // sub 1 core average allocations. This will all be removed once
+    // Resource moves to long.
+    int memory = (int) Math.round((double) runningTot.memory / window);
+    int vcores = (int) Math.round((double) runningTot.vcores / window);
+    return Resource.newInstance(memory, vcores);
+  }
+
   @Override
   public RLESparseResourceAllocation availableResources(
       RLESparseResourceAllocation available, Plan plan, String user,
@@ -208,21 +238,18 @@ public class CapacityOverTimePolicy implements SharingPolicy {
     // add back in old reservation used resources if any
     ReservationAllocation old = plan.getReservationById(oldId);
     if (old != null) {
-      used =
-          RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
-              Resources.clone(plan.getTotalCapacity()), used,
-              old.getResourcesOverTime(), RLEOperator.subtract, start, end);
+      used = RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
+          Resources.clone(plan.getTotalCapacity()), used,
+          old.getResourcesOverTime(), RLEOperator.subtract, start, end);
     }
 
-    instRLEQuota =
-        RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
-            planTotalCapacity, instRLEQuota, used, RLEOperator.subtract, start,
-            end);
+    instRLEQuota = RLESparseResourceAllocation
+        .merge(plan.getResourceCalculator(), planTotalCapacity, instRLEQuota,
+            used, RLEOperator.subtract, start, end);
 
-    instRLEQuota =
-        RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
-            planTotalCapacity, available, instRLEQuota, RLEOperator.min, start,
-            end);
+    instRLEQuota = RLESparseResourceAllocation
+        .merge(plan.getResourceCalculator(), planTotalCapacity, available,
+            instRLEQuota, RLEOperator.min, start, end);
 
     return instRLEQuota;
   }
@@ -260,11 +287,20 @@ public class CapacityOverTimePolicy implements SharingPolicy {
       vcores += r.getVirtualCores();
     }
 
+    public void add(IntegralResource r) {
+      memory += r.memory;
+      vcores += r.vcores;
+    }
+
     public void subtract(Resource r) {
       memory -= r.getMemorySize();
       vcores -= r.getVirtualCores();
     }
 
+    public IntegralResource negate() {
+      return new IntegralResource(-memory, -vcores);
+    }
+
     public void multiplyBy(long window) {
       memory = memory * window;
       vcores = vcores * window;
@@ -282,8 +318,7 @@ public class CapacityOverTimePolicy implements SharingPolicy {
     public String toString() {
       return "<memory:" + memory + ", vCores:" + vcores + ">";
     }
-  }
-
 
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e2f75f8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
index 4aed064..2dee60c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -117,6 +116,23 @@ public class TestCapacityOverTimePolicy {
             res, minAlloc), false));
   }
 
+  @Test(expected = PlanningException.class)
+  public void testAllocationLargerThanValidWindow() throws IOException,
+      PlanningException {
+    // generate allocation that exceed the validWindow
+    int[] f = generateData(25*3600, (int) Math.ceil(0.69 * totCont));
+
+    ReservationDefinition rDef =
+        ReservationSystemTestUtil.createSimpleReservationDefinition(
+            initTime, initTime + f.length + 1, f.length);
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(
+            ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
+            "dedicated", initTime, initTime + f.length,
+            ReservationSystemTestUtil.generateAllocation(initTime, step, f),
+            res, minAlloc), false));
+  }
+
   @Test
   public void testSimplePass2() throws IOException, PlanningException {
     // generate allocation from single tenant that exceed avg momentarily but
@@ -151,7 +167,7 @@ public class TestCapacityOverTimePolicy {
     }
   }
 
-  @Test(expected = ResourceOverCommitException.class)
+  @Test(expected = PlanningQuotaException.class)
   public void testMultiTenantFail() throws IOException, PlanningException {
     // generate allocation from multiple tenants that exceed tot capacity
     int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message