hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject [08/16] git commit: YARN-1711. Policy to enforce instantaneous and over-time quotas on user reservation. Contributed by Carlo Curino and Subru Krishnan. (cherry picked from commit c4918cb4cb5a267a8cfd6eace28fcfe7ad6174e8)
Date Sat, 04 Oct 2014 00:10:06 GMT
YARN-1711. Policy to enforce instantaneous and over-time quotas on user reservation. Contributed by Carlo Curino and Subru Krishnan.
(cherry picked from commit c4918cb4cb5a267a8cfd6eace28fcfe7ad6174e8)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b6df0ddd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b6df0ddd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b6df0ddd

Branch: refs/heads/trunk
Commit: b6df0dddcdafd7ec67c76ea92aea3ff3e94db247
Parents: f66ffcf
Author: carlo curino <Carlo Curino>
Authored: Tue Sep 16 13:20:57 2014 -0700
Committer: Chris Douglas <cdouglas@apache.org>
Committed: Fri Oct 3 15:42:03 2014 -0700

----------------------------------------------------------------------
 YARN-1051-CHANGES.txt                           |   3 +
 .../reservation/CapacityOverTimePolicy.java     | 231 +++++++++++++++++++
 .../reservation/NoOverCommitPolicy.java         |  74 ++++++
 .../reservation/SharingPolicy.java              |  49 ++++
 .../exceptions/ContractValidationException.java |   9 +-
 .../exceptions/MismatchedUserException.java     |  28 +++
 .../exceptions/PlanningException.java           |   9 +-
 .../exceptions/PlanningQuotaException.java      |  28 +++
 .../exceptions/ResourceOverCommitException.java |  28 +++
 .../CapacitySchedulerConfiguration.java         | 154 +++++++++++++
 .../reservation/TestCapacityOverTimePolicy.java | 222 ++++++++++++++++++
 .../reservation/TestNoOverCommitPolicy.java     | 144 ++++++++++++
 12 files changed, 977 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6df0ddd/YARN-1051-CHANGES.txt
----------------------------------------------------------------------
diff --git a/YARN-1051-CHANGES.txt b/YARN-1051-CHANGES.txt
index deece7c..e9ec691 100644
--- a/YARN-1051-CHANGES.txt
+++ b/YARN-1051-CHANGES.txt
@@ -14,3 +14,6 @@ subru)
 YARN-1710. Logic to find allocations within a Plan that satisfy 
 user ReservationRequest(s). (Carlo Curino and Subru Krishnan via 
 curino) 
+
+YARN-1711. Policy to enforce instantaneous and over-time quotas 
+on user reservations. (Carlo Curino and Subru Krishnan via curino)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6df0ddd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
new file mode 100644
index 0000000..38c0207
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
@@ -0,0 +1,231 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.Date;
+
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * This policy enforces a time-extended notion of Capacity. In particular it
+ * guarantees that the allocation received in input when combined with all
+ * previous allocation for the user does not violate an instantaneous max limit
+ * on the resources received, and that for every window of time of length
+ * validWindow, the integral of the allocations for a user (sum of the currently
+ * submitted allocation and all prior allocations for the user) does not exceed
+ * validWindow * maxAvg.
+ * 
+ * This allows flexibility, in the sense that an allocation can instantaneously
+ * use large portions of the available capacity, but prevents abuses by bounding
+ * the average use over time.
+ * 
+ * By controlling maxInst, maxAvg, validWindow the administrator configuring
+ * this policy can obtain a behavior ranging from instantaneously enforced
+ * capacity (akin to existing queues), or fully flexible allocations (likely
+ * reserved to super-users, or trusted systems).
+ */
+@LimitedPrivate("yarn")
+@Unstable
+public class CapacityOverTimePolicy implements SharingPolicy {
+
+  private CapacitySchedulerConfiguration conf;
+  private long validWindow;
+  private float maxInst;
+  private float maxAvg;
+
+  // For now this is CapacityScheduler specific, but given a hierarchy in the
+  // configuration structure of the schedulers (e.g., SchedulerConfiguration)
+  // it should be easy to remove this limitation
+  @Override
+  public void init(String reservationQueuePath, Configuration conf) {
+    this.conf = (CapacitySchedulerConfiguration) conf;
+    validWindow = this.conf.getReservationWindow(reservationQueuePath);
+    maxInst = this.conf.getInstantaneousMaxCapacity(reservationQueuePath) / 100;
+    maxAvg = this.conf.getAverageCapacity(reservationQueuePath) / 100;
+  };
+
+  @Override
+  public void validate(Plan plan, ReservationAllocation reservation)
+      throws PlanningException {
+
+    // this is entire method invoked under a write-lock on the plan, no need
+    // to synchronize accesses to the plan further
+
+    // Try to verify whether there is already a reservation with this ID in
+    // the system (remove its contribution during validation to simulate a
+    // try-n-swap
+    // update).
+    ReservationAllocation oldReservation =
+        plan.getReservationById(reservation.getReservationId());
+
+    // sanity check that the update of a reservation is not changing username
+    if (oldReservation != null
+        && !oldReservation.getUser().equals(reservation.getUser())) {
+      throw new MismatchedUserException(
+          "Updating an existing reservation with mismatched user:"
+              + oldReservation.getUser() + " != " + reservation.getUser());
+    }
+
+    long startTime = reservation.getStartTime();
+    long endTime = reservation.getEndTime();
+    long step = plan.getStep();
+
+    Resource planTotalCapacity = plan.getTotalCapacity();
+
+    Resource maxAvgRes = Resources.multiply(planTotalCapacity, maxAvg);
+    Resource maxInsRes = Resources.multiply(planTotalCapacity, maxInst);
+
+    // define variable that will store integral of resources (need diff class to
+    // avoid overflow issues for long/large allocations)
+    IntegralResource runningTot = new IntegralResource(0L, 0L);
+    IntegralResource maxAllowed = new IntegralResource(maxAvgRes);
+    maxAllowed.multiplyBy(validWindow / step);
+
+    // check that the resources offered to the user during any window of length
+    // "validWindow" overlapping this allocation are within maxAllowed
+    // also enforce instantaneous and physical constraints during this pass
+    for (long t = startTime - validWindow; t < endTime + validWindow; t += step) {
+
+      Resource currExistingAllocTot = plan.getTotalCommittedResources(t);
+      Resource currExistingAllocForUser =
+          plan.getConsumptionForUser(reservation.getUser(), t);
+      Resource currNewAlloc = reservation.getResourcesAtTime(t);
+      Resource currOldAlloc = Resources.none();
+      if (oldReservation != null) {
+        currOldAlloc = oldReservation.getResourcesAtTime(t);
+      }
+
+      // throw exception if the cluster is overcommitted
+      // tot_allocated - old + new > capacity
+      Resource inst =
+          Resources.subtract(Resources.add(currExistingAllocTot, currNewAlloc),
+              currOldAlloc);
+      if (Resources.greaterThan(plan.getResourceCalculator(),
+          planTotalCapacity, inst, planTotalCapacity)) {
+        throw new ResourceOverCommitException(" Resources at time " + t
+            + " would be overcommitted (" + inst + " over "
+            + plan.getTotalCapacity() + ") by accepting reservation: "
+            + reservation.getReservationId());
+      }
+
+      // throw exception if instantaneous limits are violated
+      // tot_alloc_to_this_user - old + new > inst_limit
+      if (Resources.greaterThan(plan.getResourceCalculator(),
+          planTotalCapacity, Resources.subtract(
+              Resources.add(currExistingAllocForUser, currNewAlloc),
+              currOldAlloc), maxInsRes)) {
+        throw new PlanningQuotaException("Instantaneous quota capacity "
+            + maxInst + " would be passed at time " + t
+            + " by accepting reservation: " + reservation.getReservationId());
+      }
+
+      // throw exception if the running integral of utilization over validWindow
+      // is violated. We perform a delta check, adding/removing instants at the
+      // boundary of the window from runningTot.
+
+      // runningTot = previous_runningTot + currExistingAllocForUser +
+      // currNewAlloc - currOldAlloc - pastNewAlloc - pastOldAlloc;
+
+      // Where:
+      // 1) currNewAlloc, currExistingAllocForUser represent the contribution of
+      // the instant in time added in this pass.
+      // 2) pastNewAlloc, pastOldAlloc are the contributions relative to time
+      // instants that are being retired from the the window
+      // 3) currOldAlloc is the contribution (if any) of the previous version of
+      // this reservation (the one we are updating)
+
+      runningTot.add(currExistingAllocForUser);
+      runningTot.add(currNewAlloc);
+      runningTot.subtract(currOldAlloc);
+
+      // expire contributions from instant in time before (t - validWindow)
+      if (t > startTime) {
+        Resource pastOldAlloc =
+            plan.getConsumptionForUser(reservation.getUser(), t - validWindow);
+        Resource pastNewAlloc = reservation.getResourcesAtTime(t - validWindow);
+
+        // runningTot = runningTot - pastExistingAlloc - pastNewAlloc;
+        runningTot.subtract(pastOldAlloc);
+        runningTot.subtract(pastNewAlloc);
+      }
+
+      // check integral
+      // runningTot > maxAvg * validWindow
+      // NOTE: we need to use comparator of IntegralResource directly, as
+      // Resource and ResourceCalculator assume "int" amount of resources,
+      // which is not sufficient when comparing integrals (out-of-bound)
+      if (maxAllowed.compareTo(runningTot) < 0) {
+        throw new PlanningQuotaException(
+            "Integral (avg over time) quota capacity " + maxAvg
+                + " over a window of " + validWindow / 1000 + " seconds, "
+                + " would be passed at time " + t + "(" + new Date(t)
+                + ") by accepting reservation: "
+                + reservation.getReservationId());
+      }
+    }
+  }
+
+  @Override
+  public long getValidWindow() {
+    return validWindow;
+  }
+
+  /**
+   * This class provides support for Resource-like book-keeping, based on
+   * long(s), as using Resource to store the "integral" of the allocation over
+   * time leads to integer overflows for large allocations/clusters. (Evolving
+   * Resource to use long is too disruptive at this point.)
+   * 
+   * The comparison/multiplication behaviors of IntegralResource are consistent
+   * with the DefaultResourceCalculator.
+   */
+  public class IntegralResource {
+    long memory;
+    long vcores;
+
+    public IntegralResource(Resource resource) {
+      this.memory = resource.getMemory();
+      this.vcores = resource.getVirtualCores();
+    }
+
+    public IntegralResource(long mem, long vcores) {
+      this.memory = mem;
+      this.vcores = vcores;
+    }
+
+    public void add(Resource r) {
+      memory += r.getMemory();
+      vcores += r.getVirtualCores();
+    }
+
+    public void subtract(Resource r) {
+      memory -= r.getMemory();
+      vcores -= r.getVirtualCores();
+    }
+
+    public void multiplyBy(long window) {
+      memory = memory * window;
+      vcores = vcores * window;
+    }
+
+    public long compareTo(IntegralResource other) {
+      long diff = memory - other.memory;
+      if (diff == 0) {
+        diff = vcores - other.vcores;
+      }
+      return diff;
+    }
+
+    @Override
+    public String toString() {
+      return "<memory:" + memory + ", vCores:" + vcores + ">";
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6df0ddd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
new file mode 100644
index 0000000..cbe2b78
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
@@ -0,0 +1,74 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * This policy enforce a simple physical cluster capacity constraints, by
+ * validating that the allocation proposed fits in the current plan. This
+ * validation is compatible with "updates" and in verifying the capacity
+ * constraints it conceptually remove the prior version of the reservation.
+ */
+@LimitedPrivate("yarn")
+@Unstable
+public class NoOverCommitPolicy implements SharingPolicy {
+
+  @Override
+  public void validate(Plan plan, ReservationAllocation reservation)
+      throws PlanningException {
+
+    ReservationAllocation oldReservation =
+        plan.getReservationById(reservation.getReservationId());
+
+    // check updates are using same name
+    if (oldReservation != null
+        && !oldReservation.getUser().equals(reservation.getUser())) {
+      throw new MismatchedUserException(
+          "Updating an existing reservation with mismatching user:"
+              + oldReservation.getUser() + " != " + reservation.getUser());
+    }
+
+    long startTime = reservation.getStartTime();
+    long endTime = reservation.getEndTime();
+    long step = plan.getStep();
+
+    // for every instant in time, check we are respecting cluster capacity
+    for (long t = startTime; t < endTime; t += step) {
+      Resource currExistingAllocTot = plan.getTotalCommittedResources(t);
+      Resource currNewAlloc = reservation.getResourcesAtTime(t);
+      Resource currOldAlloc = Resource.newInstance(0, 0);
+      if (oldReservation != null) {
+        oldReservation.getResourcesAtTime(t);
+      }
+      // check the cluster is never over committed
+      // currExistingAllocTot + currNewAlloc - currOldAlloc >
+      // capPlan.getTotalCapacity()
+      if (Resources.greaterThan(plan.getResourceCalculator(), plan
+          .getTotalCapacity(), Resources.subtract(
+          Resources.add(currExistingAllocTot, currNewAlloc), currOldAlloc),
+          plan.getTotalCapacity())) {
+        throw new ResourceOverCommitException("Resources at time " + t
+            + " would be overcommitted by " + "accepting reservation: "
+            + reservation.getReservationId());
+      }
+    }
+  }
+
+  @Override
+  public long getValidWindow() {
+    // this policy has no "memory" so the valid window is set to zero
+    return 0;
+  }
+
+  @Override
+  public void init(String inventoryQueuePath, Configuration conf) {
+    // nothing to do for this policy
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6df0ddd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
new file mode 100644
index 0000000..d917764
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
@@ -0,0 +1,49 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+
+/**
+ * This is the interface for policy that validate new
+ * {@link ReservationAllocation}s for allocations being added to a {@link Plan}.
+ * Individual policies will be enforcing different invariants.
+ */
+@LimitedPrivate("yarn")
+@Unstable
+public interface SharingPolicy {
+
+  /**
+   * Initialize this policy
+   * 
+   * @param inventoryQueuePath the name of the queue for this plan
+   * @param conf the system configuration
+   */
+  public void init(String inventoryQueuePath, Configuration conf);
+
+  /**
+   * This method runs the policy validation logic, and return true/false on
+   * whether the {@link ReservationAllocation} is acceptable according to this
+   * sharing policy.
+   * 
+   * @param plan the {@link Plan} we validate against
+   * @param newAllocation the allocation proposed to be added to the
+   *          {@link Plan}
+   * @throws PlanningException if the policy is respected if we add this
+   *           {@link ReservationAllocation} to the {@link Plan}
+   */
+  public void validate(Plan plan, ReservationAllocation newAllocation)
+      throws PlanningException;
+
+  /**
+   * Returns the time range before and after the current reservation considered
+   * by this policy. In particular, this informs the archival process for the
+   * {@link Plan}, i.e., reservations regarding times before (now - validWindow)
+   * can be deleted.
+   * 
+   * @return validWindow the window of validity considered by the policy.
+   */
+  public long getValidWindow();
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6df0ddd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ContractValidationException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ContractValidationException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ContractValidationException.java
index 7ee5a76..cd82a9e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ContractValidationException.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ContractValidationException.java
@@ -1,5 +1,13 @@
 package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
 
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * This exception is thrown if the request made is not syntactically valid.
+ */
+@Public
+@Unstable
 public class ContractValidationException extends PlanningException {
 
   private static final long serialVersionUID = 1L;
@@ -8,5 +16,4 @@ public class ContractValidationException extends PlanningException {
     super(message);
   }
 
-  
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6df0ddd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/MismatchedUserException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/MismatchedUserException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/MismatchedUserException.java
new file mode 100644
index 0000000..0a443f3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/MismatchedUserException.java
@@ -0,0 +1,28 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Exception thrown when an update to an existing reservation is performed
+ * by a user that is not the reservation owner. 
+ */
+@Public
+@Unstable
+public class MismatchedUserException extends PlanningException {
+
+  private static final long serialVersionUID = 8313222590561668413L;
+
+  public MismatchedUserException(String message) {
+    super(message);
+  }
+
+  public MismatchedUserException(Throwable cause) {
+    super(cause);
+  }
+
+  public MismatchedUserException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6df0ddd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java
index aa9e9fb..0699856 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java
@@ -2,10 +2,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
 
 import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
 
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
 /**
  * Exception thrown by the admission control subsystem when there is a problem
- * in trying to find an allocation for a user {@link ReservationSubmissionRequest}.
+ * in trying to find an allocation for a user
+ * {@link ReservationSubmissionRequest}.
  */
+
+@Public
+@Unstable
 public class PlanningException extends Exception {
 
   private static final long serialVersionUID = -684069387367879218L;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6df0ddd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningQuotaException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningQuotaException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningQuotaException.java
new file mode 100644
index 0000000..aad4ee8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningQuotaException.java
@@ -0,0 +1,28 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * This exception is thrown if the user quota is exceed while accepting or
+ * updating a reservation.
+ */
+@Public
+@Unstable
+public class PlanningQuotaException extends PlanningException {
+
+  private static final long serialVersionUID = 8206629288380246166L;
+
+  public PlanningQuotaException(String message) {
+    super(message);
+  }
+
+  public PlanningQuotaException(Throwable cause) {
+    super(cause);
+  }
+
+  public PlanningQuotaException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6df0ddd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ResourceOverCommitException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ResourceOverCommitException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ResourceOverCommitException.java
new file mode 100644
index 0000000..a4c2b07
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ResourceOverCommitException.java
@@ -0,0 +1,28 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * This exception indicate that the reservation that has been attempted, would
+ * exceed the physical resources available in the {@link Plan} at the moment.
+ */
+@Public
+@Unstable
+public class ResourceOverCommitException extends PlanningException {
+
+  private static final long serialVersionUID = 7070699407526521032L;
+
+  public ResourceOverCommitException(String message) {
+    super(message);
+  }
+
+  public ResourceOverCommitException(Throwable cause) {
+    super(cause);
+  }
+
+  public ResourceOverCommitException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6df0ddd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 5542ef3..ba501b6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -190,6 +190,63 @@ public class CapacitySchedulerConfiguration extends Configuration {
     }
   }
   
+  @Private
+  public static final String AVERAGE_CAPACITY = "average-capacity";
+
+  @Private
+  public static final String IS_RESERVABLE = "reservable";
+
+  @Private
+  public static final String RESERVATION_WINDOW = "reservation-window";
+
+  @Private
+  public static final String INSTANTANEOUS_MAX_CAPACITY =
+      "instantaneous-max-capacity";
+
+  @Private
+  public static final long DEFAULT_RESERVATION_WINDOW = 0L;
+
+  @Private
+  public static final String RESERVATION_ADMISSION_POLICY =
+      "reservation-policy";
+
+  @Private
+  public static final String RESERVATION_AGENT_NAME = "reservation-agent";
+
+  @Private
+  public static final String RESERVATION_SHOW_RESERVATION_AS_QUEUE =
+      "show-reservations-as-queues";
+
+  @Private
+  public static final String DEFAULT_RESERVATION_ADMISSION_POLICY =
+      "org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy";
+
+  @Private
+  public static final String DEFAULT_RESERVATION_AGENT_NAME =
+      "org.apache.hadoop.yarn.server.resourcemanager.reservation.GreedyReservationAgent";
+
+  @Private
+  public static final String RESERVATION_PLANNER_NAME = "reservation-planner";
+
+  @Private
+  public static final String DEFAULT_RESERVATION_PLANNER_NAME =
+      "org.apache.hadoop.yarn.server.resourcemanager.reservation.SimpleCapacityReplanner";
+
+  @Private
+  public static final String RESERVATION_MOVE_ON_EXPIRY =
+      "reservation-move-on-expiry";
+
+  @Private
+  public static final boolean DEFAULT_RESERVATION_MOVE_ON_EXPIRY = true;
+
+  @Private
+  public static final String RESERVATION_ENFORCEMENT_WINDOW =
+      "reservation-enforcement-window";
+
+  // default to 1h lookahead enforcement
+  @Private
+  public static final long DEFAULT_RESERVATION_ENFORCEMENT_WINDOW = 3600000;
+
   public CapacitySchedulerConfiguration() {
     this(new Configuration());
   }
@@ -511,4 +568,101 @@ public class CapacitySchedulerConfiguration extends Configuration {
 
     return mappings;
   }
+
+  public boolean isReservable(String queue) {
+    boolean isReservable =
+        getBoolean(getQueuePrefix(queue) + IS_RESERVABLE, false);
+    return isReservable;
+  }
+
+  public void setReservable(String queue, boolean isReservable) {
+    setBoolean(getQueuePrefix(queue) + IS_RESERVABLE, isReservable);
+    LOG.debug("here setReservableQueue: queuePrefix=" + getQueuePrefix(queue)
+        + ", isReservableQueue=" + isReservable(queue));
+  }
+
+  public long getReservationWindow(String queue) {
+    long reservationWindow =
+        getLong(getQueuePrefix(queue) + RESERVATION_WINDOW,
+            DEFAULT_RESERVATION_WINDOW);
+    return reservationWindow;
+  }
+
+  public float getAverageCapacity(String queue) {
+    float avgCapacity =
+        getFloat(getQueuePrefix(queue) + AVERAGE_CAPACITY,
+            MAXIMUM_CAPACITY_VALUE);
+    return avgCapacity;
+  }
+
+  public float getInstantaneousMaxCapacity(String queue) {
+    float instMaxCapacity =
+        getFloat(getQueuePrefix(queue) + INSTANTANEOUS_MAX_CAPACITY,
+            MAXIMUM_CAPACITY_VALUE);
+    return instMaxCapacity;
+  }
+
+  public void setInstantaneousMaxCapacity(String queue, float instMaxCapacity) {
+    setFloat(getQueuePrefix(queue) + INSTANTANEOUS_MAX_CAPACITY,
+        instMaxCapacity);
+  }
+
+  public void setReservationWindow(String queue, long reservationWindow) {
+    setLong(getQueuePrefix(queue) + RESERVATION_WINDOW, reservationWindow);
+  }
+
+  public void setAverageCapacity(String queue, float avgCapacity) {
+    setFloat(getQueuePrefix(queue) + AVERAGE_CAPACITY, avgCapacity);
+  }
+
+  public String getReservationAdmissionPolicy(String queue) {
+    String reservationPolicy =
+        get(getQueuePrefix(queue) + RESERVATION_ADMISSION_POLICY,
+            DEFAULT_RESERVATION_ADMISSION_POLICY);
+    return reservationPolicy;
+  }
+
+  public void setReservationAdmissionPolicy(String queue,
+      String reservationPolicy) {
+    set(getQueuePrefix(queue) + RESERVATION_ADMISSION_POLICY, reservationPolicy);
+  }
+
+  public String getReservationAgent(String queue) {
+    String reservationAgent =
+        get(getQueuePrefix(queue) + RESERVATION_AGENT_NAME,
+            DEFAULT_RESERVATION_AGENT_NAME);
+    return reservationAgent;
+  }
+
+  public void setReservationAgent(String queue, String reservationPolicy) {
+    set(getQueuePrefix(queue) + RESERVATION_AGENT_NAME, reservationPolicy);
+  }
+
+  public boolean getShowReservationAsQueues(String queuePath) {
+    boolean showReservationAsQueues =
+        getBoolean(getQueuePrefix(queuePath)
+            + RESERVATION_SHOW_RESERVATION_AS_QUEUE, false);
+    return showReservationAsQueues;
+  }
+
+  public String getReplanner(String queue) {
+    String replanner =
+        get(getQueuePrefix(queue) + RESERVATION_PLANNER_NAME,
+            DEFAULT_RESERVATION_PLANNER_NAME);
+    return replanner;
+  }
+
+  public boolean getMoveOnExpiry(String queue) {
+    boolean killOnExpiry =
+        getBoolean(getQueuePrefix(queue) + RESERVATION_MOVE_ON_EXPIRY,
+            DEFAULT_RESERVATION_MOVE_ON_EXPIRY);
+    return killOnExpiry;
+  }
+
+  public long getEnforcementWindow(String queue) {
+    long enforcementWindow =
+        getLong(getQueuePrefix(queue) + RESERVATION_ENFORCEMENT_WINDOW,
+            DEFAULT_RESERVATION_ENFORCEMENT_WINDOW);
+    return enforcementWindow;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6df0ddd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
new file mode 100644
index 0000000..83d6d3f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
@@ -0,0 +1,222 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCapacityOverTimePolicy {
+
+  long timeWindow;
+  long step;
+  float avgConstraint;
+  float instConstraint;
+  long initTime;
+
+  InMemoryPlan plan;
+  ReservationAgent mAgent;
+  Resource minAlloc;
+  ResourceCalculator res;
+  Resource maxAlloc;
+
+  int totCont = 1000000;
+
+  @Before
+  public void setup() throws Exception {
+
+    // 24h window
+    timeWindow = 86400000L;
+    // 1 sec step
+    step = 1000L;
+
+    // 25% avg cap on capacity
+    avgConstraint = 25;
+
+    // 70% instantaneous cap on capacity
+    instConstraint = 70;
+
+    initTime = System.currentTimeMillis();
+    minAlloc = Resource.newInstance(1024, 1);
+    res = new DefaultResourceCalculator();
+    maxAlloc = Resource.newInstance(1024 * 8, 8);
+
+    mAgent = mock(ReservationAgent.class);
+    ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+    CapacityScheduler scheduler = testUtil.mockCapacityScheduler(totCont);
+    String reservationQ = testUtil.getFullReservationQueueName();
+    CapacitySchedulerConfiguration capConf = scheduler.getConfiguration();
+    capConf.setReservationWindow(reservationQ, timeWindow);
+    capConf.setInstantaneousMaxCapacity(reservationQ, instConstraint);
+    capConf.setAverageCapacity(reservationQ, avgConstraint);
+    CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
+    policy.init(reservationQ, capConf);
+
+    plan =
+        new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent,
+            scheduler.getClusterResource(), step, res, minAlloc, maxAlloc,
+            "dedicated", null, true);
+  }
+
+  public int[] generateData(int length, int val) {
+    int[] data = new int[length];
+    for (int i = 0; i < length; i++) {
+      data[i] = val;
+    }
+    return data;
+  }
+
+  @Test
+  public void testSimplePass() throws IOException, PlanningException {
+    // generate allocation that simply fit within all constraints
+    int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont));
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(
+            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            "dedicated", initTime, initTime + f.length,
+            ReservationSystemTestUtil.generateAllocation(initTime, step, f),
+            res, minAlloc)));
+  }
+
+  @Test
+  public void testSimplePass2() throws IOException, PlanningException {
+    // generate allocation from single tenant that exceed avg momentarily but
+    // fit within
+    // max instantanesou
+    int[] f = generateData(3600, (int) Math.ceil(0.69 * totCont));
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(
+            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            "dedicated", initTime, initTime + f.length,
+            ReservationSystemTestUtil.generateAllocation(initTime, step, f),
+            res, minAlloc)));
+  }
+
+  @Test
+  public void testMultiTenantPass() throws IOException, PlanningException {
+    // generate allocation from multiple tenants that barely fit in tot capacity
+    int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
+    for (int i = 0; i < 4; i++) {
+      assertTrue(plan.toString(),
+          plan.addReservation(new InMemoryReservationAllocation(
+              ReservationSystemTestUtil.getNewReservationId(), null, "u" + i,
+              "dedicated", initTime, initTime + f.length,
+              ReservationSystemTestUtil.generateAllocation(initTime, step, f),
+              res, minAlloc)));
+    }
+  }
+
+  @Test(expected = ResourceOverCommitException.class)
+  public void testMultiTenantFail() throws IOException, PlanningException {
+    // generate allocation from multiple tenants that exceed tot capacity
+    int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
+    for (int i = 0; i < 5; i++) {
+      assertTrue(plan.toString(),
+          plan.addReservation(new InMemoryReservationAllocation(
+              ReservationSystemTestUtil.getNewReservationId(), null, "u" + i,
+              "dedicated", initTime, initTime + f.length,
+              ReservationSystemTestUtil.generateAllocation(initTime, step, f),
+              res, minAlloc)));
+    }
+  }
+
+  @Test(expected = PlanningQuotaException.class)
+  public void testInstFail() throws IOException, PlanningException {
+    // generate allocation that exceed the instantaneous cap single-show
+    int[] f = generateData(3600, (int) Math.ceil(0.71 * totCont));
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(
+            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            "dedicated", initTime, initTime + f.length,
+            ReservationSystemTestUtil.generateAllocation(initTime, step, f),
+            res, minAlloc)));
+    Assert.fail("should not have accepted this");
+  }
+
+  @Test
+  public void testInstFailBySum() throws IOException, PlanningException {
+    // generate allocation that exceed the instantaneous cap by sum
+    int[] f = generateData(3600, (int) Math.ceil(0.3 * totCont));
+
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(
+            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            "dedicated", initTime, initTime + f.length,
+            ReservationSystemTestUtil.generateAllocation(initTime, step, f),
+            res, minAlloc)));
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(
+            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            "dedicated", initTime, initTime + f.length,
+            ReservationSystemTestUtil.generateAllocation(initTime, step, f),
+            res, minAlloc)));
+    try {
+      assertTrue(plan.toString(),
+          plan.addReservation(new InMemoryReservationAllocation(
+              ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+              "dedicated", initTime, initTime + f.length,
+              ReservationSystemTestUtil.generateAllocation(initTime, step, f),
+              res, minAlloc)));
+      Assert.fail();
+    } catch (PlanningQuotaException p) {
+      // expected
+    }
+  }
+
+  @Test(expected = PlanningQuotaException.class)
+  public void testFailAvg() throws IOException, PlanningException {
+    // generate an allocation which violates the 25% average single-shot
+    Map<ReservationInterval, ReservationRequest> req =
+        new TreeMap<ReservationInterval, ReservationRequest>();
+    long win = timeWindow / 2 + 100;
+    int cont = (int) Math.ceil(0.5 * totCont);
+    req.put(new ReservationInterval(initTime, initTime + win),
+        ReservationRequest.newInstance(Resource.newInstance(1024, 1), cont));
+
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(
+            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            "dedicated", initTime, initTime + win, req, res, minAlloc)));
+  }
+
+  @Test
+  public void testFailAvgBySum() throws IOException, PlanningException {
+    // generate an allocation which violates the 25% average by sum
+    Map<ReservationInterval, ReservationRequest> req =
+        new TreeMap<ReservationInterval, ReservationRequest>();
+    long win = 86400000 / 4 + 1;
+    int cont = (int) Math.ceil(0.5 * totCont);
+    req.put(new ReservationInterval(initTime, initTime + win),
+        ReservationRequest.newInstance(Resource.newInstance(1024, 1), cont));
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(
+            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            "dedicated", initTime, initTime + win, req, res, minAlloc)));
+
+    try {
+      assertTrue(plan.toString(),
+          plan.addReservation(new InMemoryReservationAllocation(
+              ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+              "dedicated", initTime, initTime + win, req, res, minAlloc)));
+
+      Assert.fail("should not have accepted this");
+    } catch (PlanningQuotaException e) {
+      // expected
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6df0ddd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
new file mode 100644
index 0000000..2ceead3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
@@ -0,0 +1,144 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNoOverCommitPolicy {
+
+  long step;
+  long initTime;
+
+  InMemoryPlan plan;
+  ReservationAgent mAgent;
+  Resource minAlloc;
+  ResourceCalculator res;
+  Resource maxAlloc;
+
+  int totCont = 1000000;
+
+  @Before
+  public void setup() throws Exception {
+
+    // 1 sec step
+    step = 1000L;
+
+    initTime = System.currentTimeMillis();
+    minAlloc = Resource.newInstance(1024, 1);
+    res = new DefaultResourceCalculator();
+    maxAlloc = Resource.newInstance(1024 * 8, 8);
+
+    mAgent = mock(ReservationAgent.class);
+    ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+    CapacityScheduler scheduler = testUtil.mockCapacityScheduler(totCont);
+    String reservationQ = testUtil.getFullReservationQueueName();
+    CapacitySchedulerConfiguration capConf = scheduler.getConfiguration();
+    NoOverCommitPolicy policy = new NoOverCommitPolicy();
+    policy.init(reservationQ, capConf);
+
+    plan =
+        new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent,
+            scheduler.getClusterResource(), step, res, minAlloc, maxAlloc,
+            "dedicated", null, true);
+  }
+
+  public int[] generateData(int length, int val) {
+    int[] data = new int[length];
+    for (int i = 0; i < length; i++) {
+      data[i] = val;
+    }
+    return data;
+  }
+
+  @Test
+  public void testSingleUserEasyFitPass() throws IOException, PlanningException {
+    // generate allocation that easily fit within resource constraints
+    int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont));
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(
+            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            "dedicated", initTime, initTime + f.length,
+            ReservationSystemTestUtil.generateAllocation(initTime, step, f),
+            res, minAlloc)));
+  }
+
+  @Test
+  public void testSingleUserBarelyFitPass() throws IOException,
+      PlanningException {
+    // generate allocation from single tenant that barely fit
+    int[] f = generateData(3600, totCont);
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(
+            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            "dedicated", initTime, initTime + f.length,
+            ReservationSystemTestUtil.generateAllocation(initTime, step, f),
+            res, minAlloc)));
+  }
+
+  @Test(expected = ResourceOverCommitException.class)
+  public void testSingleFail() throws IOException, PlanningException {
+    // generate allocation from single tenant that exceed capacity
+    int[] f = generateData(3600, (int) (1.1 * totCont));
+    plan.addReservation(new InMemoryReservationAllocation(
+        ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+        "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
+            .generateAllocation(initTime, step, f), res, minAlloc));
+  }
+
+  @Test(expected = MismatchedUserException.class)
+  public void testUserMismatch() throws IOException, PlanningException {
+    // generate allocation from single tenant that exceed capacity
+    int[] f = generateData(3600, (int) (0.5 * totCont));
+
+    ReservationId rid = ReservationSystemTestUtil.getNewReservationId();
+    plan.addReservation(new InMemoryReservationAllocation(rid, null, "u1",
+        "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
+            .generateAllocation(initTime, step, f), res, minAlloc));
+
+    // trying to update a reservation with a mismatching user
+    plan.updateReservation(new InMemoryReservationAllocation(rid, null, "u2",
+        "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
+            .generateAllocation(initTime, step, f), res, minAlloc));
+  }
+
+  @Test
+  public void testMultiTenantPass() throws IOException, PlanningException {
+    // generate allocation from multiple tenants that barely fit in tot capacity
+    int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
+    for (int i = 0; i < 4; i++) {
+      assertTrue(plan.toString(),
+          plan.addReservation(new InMemoryReservationAllocation(
+              ReservationSystemTestUtil.getNewReservationId(), null, "u" + i,
+              "dedicated", initTime, initTime + f.length,
+              ReservationSystemTestUtil.generateAllocation(initTime, step, f),
+              res, minAlloc)));
+    }
+  }
+
+  @Test(expected = ResourceOverCommitException.class)
+  public void testMultiTenantFail() throws IOException, PlanningException {
+    // generate allocation from multiple tenants that exceed tot capacity
+    int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
+    for (int i = 0; i < 5; i++) {
+      assertTrue(plan.toString(),
+          plan.addReservation(new InMemoryReservationAllocation(
+              ReservationSystemTestUtil.getNewReservationId(), null, "u" + i,
+              "dedicated", initTime, initTime + f.length,
+              ReservationSystemTestUtil.generateAllocation(initTime, step, f),
+              res, minAlloc)));
+    }
+  }
+}


Mime
View raw message