hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From su...@apache.org
Subject [2/3] hadoop git commit: Plan/ResourceAllocation data structure enhancements required to support recurring reservations in ReservationSystem.
Date Fri, 01 Sep 2017 22:17:00 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
index 658387b..100d38c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
@@ -18,8 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
-import java.io.IOException;
-import java.io.StringWriter;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -33,8 +32,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.Plan
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
-import com.google.gson.stream.JsonWriter;
-
 /**
  * This is a run length encoded sparse data structure that maintains resource
  * allocations over time.
@@ -44,12 +41,14 @@ public class RLESparseResourceAllocation {
   private static final int THRESHOLD = 100;
   private static final Resource ZERO_RESOURCE = Resources.none();
 
-  private NavigableMap<Long, Resource> cumulativeCapacity =
+  @SuppressWarnings("checkstyle:visibilitymodifier")
+  protected NavigableMap<Long, Resource> cumulativeCapacity =
       new TreeMap<Long, Resource>();
 
   private final ReentrantReadWriteLock readWriteLock =
       new ReentrantReadWriteLock();
-  private final Lock readLock = readWriteLock.readLock();
+  @SuppressWarnings("checkstyle:visibilitymodifier")
+  protected final Lock readLock = readWriteLock.readLock();
   private final Lock writeLock = readWriteLock.writeLock();
 
   private final ResourceCalculator resourceCalculator;
@@ -236,34 +235,6 @@ public class RLESparseResourceAllocation {
   }
 
   /**
-   * Returns the JSON string representation of the current resources allocated
-   * over time.
-   *
-   * @return the JSON string representation of the current resources allocated
-   *         over time
-   */
-  public String toMemJSONString() {
-    StringWriter json = new StringWriter();
-    JsonWriter jsonWriter = new JsonWriter(json);
-    readLock.lock();
-    try {
-      jsonWriter.beginObject();
-      // jsonWriter.name("timestamp").value("resource");
-      for (Map.Entry<Long, Resource> r : cumulativeCapacity.entrySet()) {
-        jsonWriter.name(r.getKey().toString()).value(r.getValue().toString());
-      }
-      jsonWriter.endObject();
-      jsonWriter.close();
-      return json.toString();
-    } catch (IOException e) {
-      // This should not happen
-      return "";
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  /**
    * Returns the representation of the current resources allocated over time as
    * an interval map (in the defined non-null range).
    *
@@ -304,7 +275,7 @@ public class RLESparseResourceAllocation {
   public NavigableMap<Long, Resource> getCumulative() {
     readLock.lock();
     try {
-      return cumulativeCapacity;
+      return Collections.unmodifiableNavigableMap(cumulativeCapacity);
     } finally {
       readLock.unlock();
     }
@@ -437,8 +408,8 @@ public class RLESparseResourceAllocation {
       Resource val = Resources.negate(e.getValue());
       // test for negative value and throws
       if (operator == RLEOperator.subtractTestNonNegative
-          && (Resources.fitsIn(val, ZERO_RESOURCE) &&
-              !Resources.equals(val, ZERO_RESOURCE))) {
+          && (Resources.fitsIn(val, ZERO_RESOURCE)
+              && !Resources.equals(val, ZERO_RESOURCE))) {
         throw new PlanningException(
             "RLESparseResourceAllocation: merge failed as the "
                 + "resulting RLESparseResourceAllocation would be negative");
@@ -504,22 +475,29 @@ public class RLESparseResourceAllocation {
 
   }
 
+  /**
+   * Get a {@link RLESparseResourceAllocation} view of the {@link Resource}
+   * allocations between the specified start and end times.
+   *
+   * @param start the time from which the {@link Resource} allocations are
+   *          required
+   * @param end the time upto which the {@link Resource} allocations are
+   *          required
+   * @return the overlapping allocations
+   */
   public RLESparseResourceAllocation getRangeOverlapping(long start, long end) {
     readLock.lock();
     try {
       NavigableMap<Long, Resource> a = this.getCumulative();
-
       if (a != null && !a.isEmpty()) {
         // include the portion of previous entry that overlaps start
         if (start > a.firstKey()) {
           long previous = a.floorKey(start);
           a = a.tailMap(previous, true);
         }
-
         if (end < a.lastKey()) {
           a = a.headMap(end, true);
         }
-
       }
       RLESparseResourceAllocation ret =
           new RLESparseResourceAllocation(a, resourceCalculator);
@@ -527,7 +505,33 @@ public class RLESparseResourceAllocation {
     } finally {
       readLock.unlock();
     }
+  }
 
+  /**
+   * This method shifts all the timestamp of the {@link Resource} entries by the
+   * specified "delta".
+   *
+   * @param delta the time by which to shift the {@link Resource} allocations
+   */
+  public void shift(long delta) {
+    writeLock.lock();
+    try {
+      TreeMap<Long, Resource> newCum = new TreeMap<>();
+      long start;
+      for (Map.Entry<Long, Resource> entry : cumulativeCapacity.entrySet()) {
+        if (delta > 0) {
+          start = (entry.getKey() == Long.MAX_VALUE) ? Long.MAX_VALUE
+              : entry.getKey() + delta;
+        } else {
+          start = (entry.getKey() == Long.MIN_VALUE) ? Long.MIN_VALUE
+              : entry.getKey() + delta;
+        }
+        newCum.put(start, entry.getValue());
+      }
+      cumulativeCapacity = newCum;
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   /**
@@ -541,8 +545,8 @@ public class RLESparseResourceAllocation {
   /**
    * Get the maximum capacity across specified time instances. The search-space
    * is specified using the starting value, tick, and the periodic interval for
-   * search. Maximum resource allocation across tick, tick + period,
-   * tick + 2 * period,..., tick + n * period .. is returned.
+   * search. Maximum resource allocation across tick, tick + period, tick + 2 *
+   * period,..., tick + n * period .. is returned.
    *
    * @param tick the starting time instance
    * @param period interval at which capacity is evaluated
@@ -550,14 +554,19 @@ public class RLESparseResourceAllocation {
    */
   public Resource getMaximumPeriodicCapacity(long tick, long period) {
     Resource maxCapacity = ZERO_RESOURCE;
-    if (!cumulativeCapacity.isEmpty()) {
-      Long lastKey = cumulativeCapacity.lastKey();
-      for (long t = tick; t <= lastKey; t = t + period) {
-        maxCapacity = Resources.componentwiseMax(maxCapacity,
-            cumulativeCapacity.floorEntry(t).getValue());
+    readLock.lock();
+    try {
+      if (!cumulativeCapacity.isEmpty()) {
+        Long lastKey = cumulativeCapacity.lastKey();
+        for (long t = tick; t <= lastKey; t = t + period) {
+          maxCapacity = Resources.componentwiseMax(maxCapacity,
+              cumulativeCapacity.floorEntry(t).getValue());
+        }
       }
+      return maxCapacity;
+    } finally {
+      readLock.unlock();
     }
-    return maxCapacity;
   }
 
   /**
@@ -567,17 +576,17 @@ public class RLESparseResourceAllocation {
    * @return minimum resource allocation
    */
   public Resource getMinimumCapacityInInterval(ReservationInterval interval) {
-    Resource minCapacity = Resource.newInstance(
-        Integer.MAX_VALUE, Integer.MAX_VALUE);
+    Resource minCapacity =
+        Resource.newInstance(Integer.MAX_VALUE, Integer.MAX_VALUE);
     long start = interval.getStartTime();
     long end = interval.getEndTime();
     NavigableMap<Long, Resource> capacityRange =
-        this.getRangeOverlapping(start, end).getCumulative();
+        getRangeOverlapping(start, end).getCumulative();
     if (!capacityRange.isEmpty()) {
       for (Map.Entry<Long, Resource> entry : capacityRange.entrySet()) {
         if (entry.getValue() != null) {
-          minCapacity = Resources.componentwiseMin(minCapacity,
-              entry.getValue());
+          minCapacity =
+              Resources.componentwiseMin(minCapacity, entry.getValue());
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
index 0da95ac..bb4a7fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
@@ -24,14 +24,16 @@ import org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * A ReservationAllocation represents a concrete allocation of resources over
  * time that satisfy a certain {@link ReservationDefinition}. This is used
  * internally by a {@link Plan} to store information about how each of the
  * accepted {@link ReservationDefinition} have been allocated.
  */
-public interface ReservationAllocation extends
-    Comparable<ReservationAllocation> {
+public interface ReservationAllocation
+    extends Comparable<ReservationAllocation> {
 
   /**
    * Returns the unique identifier {@link ReservationId} that represents the
@@ -40,28 +42,28 @@ public interface ReservationAllocation extends
    * @return reservationId the unique identifier {@link ReservationId} that
    *         represents the reservation
    */
-  public ReservationId getReservationId();
+  ReservationId getReservationId();
 
   /**
    * Returns the original {@link ReservationDefinition} submitted by the client
    * 
    * @return the {@link ReservationDefinition} submitted by the client
    */
-  public ReservationDefinition getReservationDefinition();
+  ReservationDefinition getReservationDefinition();
 
   /**
    * Returns the time at which the reservation is activated.
    * 
    * @return the time at which the reservation is activated
    */
-  public long getStartTime();
+  long getStartTime();
 
   /**
    * Returns the time at which the reservation terminates.
    * 
    * @return the time at which the reservation terminates
    */
-  public long getEndTime();
+  long getEndTime();
 
   /**
    * Returns the map of resources requested against the time interval for which
@@ -70,28 +72,28 @@ public interface ReservationAllocation extends
    * @return the allocationRequests the map of resources requested against the
    *         time interval for which they were
    */
-  public Map<ReservationInterval, Resource> getAllocationRequests();
+  Map<ReservationInterval, Resource> getAllocationRequests();
 
   /**
    * Return a string identifying the plan to which the reservation belongs
    * 
    * @return the plan to which the reservation belongs
    */
-  public String getPlanName();
+  String getPlanName();
 
   /**
    * Returns the user who requested the reservation
    * 
    * @return the user who requested the reservation
    */
-  public String getUser();
+  String getUser();
 
   /**
    * Returns whether the reservation has gang semantics or not
    * 
    * @return true if there is a gang request, false otherwise
    */
-  public boolean containsGangs();
+  boolean containsGangs();
 
   /**
    * Sets the time at which the reservation was accepted by the system
@@ -99,14 +101,14 @@ public interface ReservationAllocation extends
    * @param acceptedAt the time at which the reservation was accepted by the
    *          system
    */
-  public void setAcceptanceTimestamp(long acceptedAt);
+  void setAcceptanceTimestamp(long acceptedAt);
 
   /**
    * Returns the time at which the reservation was accepted by the system
    * 
    * @return the time at which the reservation was accepted by the system
    */
-  public long getAcceptanceTime();
+  long getAcceptanceTime();
 
   /**
    * Returns the capacity represented by cumulative resources reserved by the
@@ -116,12 +118,42 @@ public interface ReservationAllocation extends
    *          requested
    * @return the resources reserved at the specified time
    */
-  public Resource getResourcesAtTime(long tick);
+  Resource getResourcesAtTime(long tick);
+
+  /**
+   * Return a RLE representation of used resources.
+   *
+   * @return a RLE encoding of resources allocated over time.
+   */
+  RLESparseResourceAllocation getResourcesOverTime();
+
 
   /**
    * Return a RLE representation of used resources.
+   *
+   * @param start start of the time interval.
+   * @param end end of the time interval.
    * @return a RLE encoding of resources allocated over time.
    */
-  public RLESparseResourceAllocation getResourcesOverTime();
+  RLESparseResourceAllocation getResourcesOverTime(long start, long end);
+
+  /**
+   * Get the periodicity of this reservation representing the time period of the
+   * periodic job. Period is represented in milliseconds for periodic jobs.
+   * Period is 0 for non-periodic jobs.
+   *
+   * @return periodicity of this reservation
+   */
+  long getPeriodicity();
+
+  /**
+   * Set the periodicity of this reservation representing the time period of the
+   * periodic job. Period is represented in milliseconds for periodic jobs.
+   * Period is 0 for non-periodic jobs.
+   *
+   * @param period periodicity of this reservation
+   */
+  @VisibleForTesting
+  void setPeriodicity(long period);
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java
index 027d066..a66d222 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java
@@ -44,6 +44,8 @@ public class ReservationInputValidator {
 
   /**
    * Utility class to validate reservation requests.
+   *
+   * @param clock the {@link Clock} to use
    */
   public ReservationInputValidator(Clock clock) {
     this.clock = clock;
@@ -53,22 +55,21 @@ public class ReservationInputValidator {
       ReservationId reservationId, String auditConstant) throws YarnException {
     // check if the reservation id is valid
     if (reservationId == null) {
-      String message =
-          "Missing reservation id."
-              + " Please try again by specifying a reservation id.";
+      String message = "Missing reservation id."
+          + " Please try again by specifying a reservation id.";
       RMAuditLogger.logFailure("UNKNOWN", auditConstant,
           "validate reservation input", "ClientRMService", message);
       throw RPCUtil.getRemoteException(message);
     }
     String queue = reservationSystem.getQueueForReservation(reservationId);
     String nullQueueErrorMessage =
-            "The specified reservation with ID: " + reservationId
-                    + " is unknown. Please try again with a valid reservation.";
+        "The specified reservation with ID: " + reservationId
+            + " is unknown. Please try again with a valid reservation.";
     String nullPlanErrorMessage = "The specified reservation: " + reservationId
-                            + " is not associated with any valid plan."
-                            + " Please try again with a valid reservation.";
+        + " is not associated with any valid plan."
+        + " Please try again with a valid reservation.";
     return getPlanFromQueue(reservationSystem, queue, auditConstant,
-            nullQueueErrorMessage, nullPlanErrorMessage);
+        nullQueueErrorMessage, nullPlanErrorMessage);
   }
 
   private void validateReservationDefinition(ReservationId reservationId,
@@ -77,17 +78,15 @@ public class ReservationInputValidator {
     String message = "";
     // check if deadline is in the past
     if (contract == null) {
-      message =
-          "Missing reservation definition."
-              + " Please try again by specifying a reservation definition.";
+      message = "Missing reservation definition."
+          + " Please try again by specifying a reservation definition.";
       RMAuditLogger.logFailure("UNKNOWN", auditConstant,
           "validate reservation input definition", "ClientRMService", message);
       throw RPCUtil.getRemoteException(message);
     }
     if (contract.getDeadline() <= clock.getTime()) {
-      message =
-          "The specified deadline: " + contract.getDeadline()
-              + " is the past. Please try again with deadline in the future.";
+      message = "The specified deadline: " + contract.getDeadline()
+          + " is the past. Please try again with deadline in the future.";
       RMAuditLogger.logFailure("UNKNOWN", auditConstant,
           "validate reservation input definition", "ClientRMService", message);
       throw RPCUtil.getRemoteException(message);
@@ -95,18 +94,16 @@ public class ReservationInputValidator {
     // Check if at least one RR has been specified
     ReservationRequests resReqs = contract.getReservationRequests();
     if (resReqs == null) {
-      message =
-          "No resources have been specified to reserve."
-              + "Please try again by specifying the resources to reserve.";
+      message = "No resources have been specified to reserve."
+          + "Please try again by specifying the resources to reserve.";
       RMAuditLogger.logFailure("UNKNOWN", auditConstant,
           "validate reservation input definition", "ClientRMService", message);
       throw RPCUtil.getRemoteException(message);
     }
     List<ReservationRequest> resReq = resReqs.getReservationResources();
     if (resReq == null || resReq.isEmpty()) {
-      message =
-          "No resources have been specified to reserve."
-              + " Please try again by specifying the resources to reserve.";
+      message = "No resources have been specified to reserve."
+          + " Please try again by specifying the resources to reserve.";
       RMAuditLogger.logFailure("UNKNOWN", auditConstant,
           "validate reservation input definition", "ClientRMService", message);
       throw RPCUtil.getRemoteException(message);
@@ -123,22 +120,18 @@ public class ReservationInputValidator {
       } else {
         minDuration += rr.getDuration();
       }
-      maxGangSize =
-          Resources.max(plan.getResourceCalculator(), plan.getTotalCapacity(),
-              maxGangSize,
-              Resources.multiply(rr.getCapability(), rr.getConcurrency()));
+      maxGangSize = Resources.max(plan.getResourceCalculator(),
+          plan.getTotalCapacity(), maxGangSize,
+          Resources.multiply(rr.getCapability(), rr.getConcurrency()));
     }
     // verify the allocation is possible (skip for ANY)
     long duration = contract.getDeadline() - contract.getArrival();
-    if (duration < minDuration
-        && type != ReservationRequestInterpreter.R_ANY) {
-      message =
-          "The time difference ("
-              + (duration)
-              + ") between arrival (" + contract.getArrival() + ") "
-              + "and deadline (" + contract.getDeadline() + ") must "
-              + " be greater or equal to the minimum resource duration ("
-              + minDuration + ")";
+    if (duration < minDuration && type != ReservationRequestInterpreter.R_ANY) {
+      message = "The time difference (" + (duration) + ") between arrival ("
+          + contract.getArrival() + ") " + "and deadline ("
+          + contract.getDeadline() + ") must "
+          + " be greater or equal to the minimum resource duration ("
+          + minDuration + ")";
       RMAuditLogger.logFailure("UNKNOWN", auditConstant,
           "validate reservation input definition", "ClientRMService", message);
       throw RPCUtil.getRemoteException(message);
@@ -148,10 +141,9 @@ public class ReservationInputValidator {
     if (Resources.greaterThan(plan.getResourceCalculator(),
         plan.getTotalCapacity(), maxGangSize, plan.getTotalCapacity())
         && type != ReservationRequestInterpreter.R_ANY) {
-      message =
-          "The size of the largest gang in the reservation definition ("
-              + maxGangSize + ") exceed the capacity available ("
-              + plan.getTotalCapacity() + " )";
+      message = "The size of the largest gang in the reservation definition ("
+          + maxGangSize + ") exceed the capacity available ("
+          + plan.getTotalCapacity() + " )";
       RMAuditLogger.logFailure("UNKNOWN", auditConstant,
           "validate reservation input definition", "ClientRMService", message);
       throw RPCUtil.getRemoteException(message);
@@ -179,32 +171,32 @@ public class ReservationInputValidator {
     }
   }
 
-  private Plan getPlanFromQueue(ReservationSystem reservationSystem, String
-          queue, String auditConstant) throws YarnException {
+  private Plan getPlanFromQueue(ReservationSystem reservationSystem,
+      String queue, String auditConstant) throws YarnException {
     String nullQueueErrorMessage = "The queue is not specified."
-            + " Please try again with a valid reservable queue.";
+        + " Please try again with a valid reservable queue.";
     String nullPlanErrorMessage = "The specified queue: " + queue
-            + " is not managed by reservation system."
-            + " Please try again with a valid reservable queue.";
+        + " is not managed by reservation system."
+        + " Please try again with a valid reservable queue.";
     return getPlanFromQueue(reservationSystem, queue, auditConstant,
-            nullQueueErrorMessage, nullPlanErrorMessage);
+        nullQueueErrorMessage, nullPlanErrorMessage);
   }
 
-  private Plan getPlanFromQueue(ReservationSystem reservationSystem, String
-          queue, String auditConstant, String nullQueueErrorMessage,
-          String nullPlanErrorMessage) throws YarnException {
+  private Plan getPlanFromQueue(ReservationSystem reservationSystem,
+      String queue, String auditConstant, String nullQueueErrorMessage,
+      String nullPlanErrorMessage) throws YarnException {
     if (queue == null || queue.isEmpty()) {
       RMAuditLogger.logFailure("UNKNOWN", auditConstant,
-              "validate reservation input", "ClientRMService",
-              nullQueueErrorMessage);
+          "validate reservation input", "ClientRMService",
+          nullQueueErrorMessage);
       throw RPCUtil.getRemoteException(nullQueueErrorMessage);
     }
     // check if the associated plan is valid
     Plan plan = reservationSystem.getPlan(queue);
     if (plan == null) {
       RMAuditLogger.logFailure("UNKNOWN", auditConstant,
-              "validate reservation input", "ClientRMService",
-              nullPlanErrorMessage);
+          "validate reservation input", "ClientRMService",
+          nullPlanErrorMessage);
       throw RPCUtil.getRemoteException(nullPlanErrorMessage);
     }
     return plan;
@@ -222,22 +214,21 @@ public class ReservationInputValidator {
    * @param reservationId the {@link ReservationId} associated with the current
    *          request
    * @return the {@link Plan} to submit the request to
-   * @throws YarnException
+   * @throws YarnException if validation fails
    */
   public Plan validateReservationSubmissionRequest(
-      ReservationSystem reservationSystem,
-      ReservationSubmissionRequest request, ReservationId reservationId)
-      throws YarnException {
+      ReservationSystem reservationSystem, ReservationSubmissionRequest request,
+      ReservationId reservationId) throws YarnException {
     String message;
     if (reservationId == null) {
-      message = "Reservation id cannot be null. Please try again " +
-        "specifying a valid reservation id by creating a new reservation id.";
+      message = "Reservation id cannot be null. Please try again specifying "
+          + " a valid reservation id by creating a new reservation id.";
       throw RPCUtil.getRemoteException(message);
     }
     // Check if it is a managed queue
     String queue = request.getQueue();
     Plan plan = getPlanFromQueue(reservationSystem, queue,
-            AuditConstants.SUBMIT_RESERVATION_REQUEST);
+        AuditConstants.SUBMIT_RESERVATION_REQUEST);
 
     validateReservationDefinition(reservationId,
         request.getReservationDefinition(), plan,
@@ -255,15 +246,14 @@ public class ReservationInputValidator {
    * @param request the {@link ReservationUpdateRequest} defining the resources
    *          required over time for the request
    * @return the {@link Plan} to submit the request to
-   * @throws YarnException
+   * @throws YarnException if validation fails
    */
   public Plan validateReservationUpdateRequest(
       ReservationSystem reservationSystem, ReservationUpdateRequest request)
       throws YarnException {
     ReservationId reservationId = request.getReservationId();
-    Plan plan =
-        validateReservation(reservationSystem, reservationId,
-            AuditConstants.UPDATE_RESERVATION_REQUEST);
+    Plan plan = validateReservation(reservationSystem, reservationId,
+        AuditConstants.UPDATE_RESERVATION_REQUEST);
     validateReservationDefinition(reservationId,
         request.getReservationDefinition(), plan,
         AuditConstants.UPDATE_RESERVATION_REQUEST);
@@ -278,28 +268,26 @@ public class ReservationInputValidator {
    *
    * @param reservationSystem the {@link ReservationSystem} to validate against
    * @param request the {@link ReservationListRequest} defining search
-   *                parameters for reservations in the {@link ReservationSystem}
-   *                that is being validated against.
+   *          parameters for reservations in the {@link ReservationSystem} that
+   *          is being validated against.
    * @return the {@link Plan} to list reservations of.
-   * @throws YarnException
+   * @throws YarnException if validation fails
    */
   public Plan validateReservationListRequest(
-      ReservationSystem reservationSystem,
-      ReservationListRequest request)
+      ReservationSystem reservationSystem, ReservationListRequest request)
       throws YarnException {
     String queue = request.getQueue();
     if (request.getEndTime() < request.getStartTime()) {
-      String errorMessage = "The specified end time must be greater than " +
-              "the specified start time.";
+      String errorMessage = "The specified end time must be greater than "
+          + "the specified start time.";
       RMAuditLogger.logFailure("UNKNOWN",
-              AuditConstants.LIST_RESERVATION_REQUEST,
-              "validate list reservation input", "ClientRMService",
-              errorMessage);
+          AuditConstants.LIST_RESERVATION_REQUEST,
+          "validate list reservation input", "ClientRMService", errorMessage);
       throw RPCUtil.getRemoteException(errorMessage);
     }
     // Check if it is a managed queue
     return getPlanFromQueue(reservationSystem, queue,
-            AuditConstants.LIST_RESERVATION_REQUEST);
+        AuditConstants.LIST_RESERVATION_REQUEST);
   }
 
   /**
@@ -312,7 +300,7 @@ public class ReservationInputValidator {
    * @param request the {@link ReservationDeleteRequest} defining the resources
    *          required over time for the request
    * @return the {@link Plan} to submit the request to
-   * @throws YarnException
+   * @throws YarnException if validation fails
    */
   public Plan validateReservationDeleteRequest(
       ReservationSystem reservationSystem, ReservationDeleteRequest request)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
index 8b62972..a6c8fcf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
+import java.util.Map;
+
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
@@ -29,8 +31,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ReservationsACLsManager;
 
-import java.util.Map;
-
 /**
  * This interface is the one implemented by any system that wants to support
  * Reservations i.e. make {@code Resource} allocations in future. Implementors
@@ -57,7 +57,7 @@ public interface ReservationSystem extends Recoverable {
    * 
    * @param conf configuration
    * @param rmContext current context of the {@code ResourceManager}
-   * @throws YarnException
+   * @throws YarnException if initialization of the configured plan fails
    */
   void reinitialize(Configuration conf, RMContext rmContext)
       throws YarnException;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
index e458055..cbf0f38 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
@@ -38,7 +38,7 @@ public interface SharingPolicy {
    * @param planQueuePath the name of the queue for this plan
    * @param conf the system configuration
    */
-  public void init(String planQueuePath, ReservationSchedulerConfiguration conf);
+  void init(String planQueuePath, ReservationSchedulerConfiguration conf);
 
   /**
    * This method runs the policy validation logic, and return true/false on
@@ -51,7 +51,7 @@ public interface SharingPolicy {
    * @throws PlanningException if the policy is respected if we add this
    *           {@link ReservationAllocation} to the {@link Plan}
    */
-  public void validate(Plan plan, ReservationAllocation newAllocation)
+  void validate(Plan plan, ReservationAllocation newAllocation)
       throws PlanningException;
 
   /**
@@ -68,9 +68,13 @@ public interface SharingPolicy {
    * @param start the start time for the range we are querying
    * @param end the end time for the range we are querying
    * @param oldId (optional) the id of a reservation being updated
+   *
+   * @return the available resources expressed as a
+   *         {@link RLESparseResourceAllocation}
+   *
    * @throws PlanningException throws if the request is not valid
    */
-  public RLESparseResourceAllocation availableResources(
+  RLESparseResourceAllocation availableResources(
       RLESparseResourceAllocation available, Plan plan, String user,
       ReservationId oldId, long start, long end) throws PlanningException;
 
@@ -82,7 +86,6 @@ public interface SharingPolicy {
    * 
    * @return validWindow the window of validity considered by the policy.
    */
-  public long getValidWindow();
-
+  long getValidWindow();
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java
index abac6ac..af0e712 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java
@@ -34,7 +34,7 @@ public interface Planner {
    *
    * @param plan the {@link Plan} to replan
    * @param contracts the list of reservation requests
-   * @throws PlanningException
+   * @throws PlanningException if operation is unsuccessful
    */
   public void plan(Plan plan, List<ReservationDefinition> contracts)
       throws PlanningException;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
index 199bfa5..bbbf0d6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
@@ -50,7 +50,7 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
    * @return whether the allocateUser function was successful or not
    *
    * @throws PlanningException if the session cannot be fitted into the plan
-   * @throws ContractValidationException
+   * @throws ContractValidationException if validation fails
    */
   protected boolean allocateUser(ReservationId reservationId, String user,
       Plan plan, ReservationDefinition contract,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
index ec6d9c0..8934b0f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
@@ -50,7 +50,7 @@ public interface StageAllocator {
    *
    * @return The computed allocation (or null if the stage could not be
    *         allocated)
-   * @throws PlanningException
+   * @throws PlanningException if operation is unsuccessful
    */
   Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
       RLESparseResourceAllocation planLoads,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
index da04336..d107487 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
@@ -69,7 +69,7 @@ public class StageAllocatorGreedy implements StageAllocator {
 
     RLESparseResourceAllocation netAvailable =
         plan.getAvailableResourceOverTime(user, oldId, stageEarliestStart,
-            stageDeadline);
+            stageDeadline, 0);
 
     netAvailable =
         RLESparseResourceAllocation.merge(plan.getResourceCalculator(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
index ec83e02..ae7d91a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
@@ -83,9 +83,8 @@ public class StageAllocatorGreedyRLE implements StageAllocator {
     int gangsToPlace = rr.getNumContainers() / rr.getConcurrency();
 
     // get available resources from plan
-    RLESparseResourceAllocation netRLERes =
-        plan.getAvailableResourceOverTime(user, oldId, stageEarliestStart,
-            stageDeadline);
+    RLESparseResourceAllocation netRLERes = plan.getAvailableResourceOverTime(
+        user, oldId, stageEarliestStart, stageDeadline, 0);
 
     // remove plan modifications
     netRLERes =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
index e45f58c..c014549 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
@@ -77,8 +77,8 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
     ResourceCalculator resCalc = plan.getResourceCalculator();
     Resource capacity = plan.getTotalCapacity();
 
-    RLESparseResourceAllocation netRLERes = plan
-        .getAvailableResourceOverTime(user, oldId, stageArrival, stageDeadline);
+    RLESparseResourceAllocation netRLERes = plan.getAvailableResourceOverTime(
+        user, oldId, stageArrival, stageDeadline, 0);
 
     long step = plan.getStep();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
index e99842e..5337e06 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
@@ -19,7 +19,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anySetOf;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 import java.io.FileWriter;
 import java.io.IOException;
@@ -76,7 +79,8 @@ public class ReservationSystemTestUtil {
       String reservationQ, long timeWindow, float instConstraint,
       float avgConstraint) {
 
-    ReservationSchedulerConfiguration realConf = new CapacitySchedulerConfiguration();
+    ReservationSchedulerConfiguration realConf =
+        new CapacitySchedulerConfiguration();
     ReservationSchedulerConfiguration conf = spy(realConf);
     when(conf.getReservationWindow(reservationQ)).thenReturn(timeWindow);
     when(conf.getInstantaneousMaxCapacity(reservationQ))
@@ -168,7 +172,6 @@ public class ReservationSystemTestUtil {
     scheduler.start();
     scheduler.reinitialize(conf, rmContext);
 
-
     Resource resource =
         ReservationSystemTestUtil.calculateClusterResource(numContainers);
     RMNode node1 = MockNodes.newNodeInfo(1, resource, 1, "127.0.0.1");
@@ -184,10 +187,16 @@ public class ReservationSystemTestUtil {
 
   public static ReservationDefinition createSimpleReservationDefinition(
       long arrival, long deadline, long duration, int parallelism) {
+    return createSimpleReservationDefinition(arrival, deadline, duration,
+        parallelism, null);
+  }
+
+  public static ReservationDefinition createSimpleReservationDefinition(
+      long arrival, long deadline, long duration, int parallelism,
+      String recurrenceExpression) {
     // create a request with a single atomic ask
-    ReservationRequest r =
-        ReservationRequest.newInstance(Resource.newInstance(1024, 1),
-            parallelism, parallelism, duration);
+    ReservationRequest r = ReservationRequest.newInstance(
+        Resource.newInstance(1024, 1), parallelism, parallelism, duration);
     ReservationDefinition rDef = new ReservationDefinitionPBImpl();
     ReservationRequests reqs = new ReservationRequestsPBImpl();
     reqs.setReservationResources(Collections.singletonList(r));
@@ -195,32 +204,31 @@ public class ReservationSystemTestUtil {
     rDef.setReservationRequests(reqs);
     rDef.setArrival(arrival);
     rDef.setDeadline(deadline);
+    if (recurrenceExpression != null) {
+      rDef.setRecurrenceExpression(recurrenceExpression);
+    }
     return rDef;
   }
 
   public static ReservationSubmissionRequest createSimpleReservationRequest(
       ReservationId reservationId, int numContainers, long arrival,
       long deadline, long duration) {
-    return createSimpleReservationRequest(reservationId, numContainers,
-        arrival, deadline, duration, Priority.UNDEFINED);
+    return createSimpleReservationRequest(reservationId, numContainers, arrival,
+        deadline, duration, Priority.UNDEFINED);
   }
 
   public static ReservationSubmissionRequest createSimpleReservationRequest(
       ReservationId reservationId, int numContainers, long arrival,
       long deadline, long duration, Priority priority) {
     // create a request with a single atomic ask
-    ReservationRequest r =
-        ReservationRequest.newInstance(Resource.newInstance(1024, 1),
-            numContainers, 1, duration);
-    ReservationRequests reqs =
-        ReservationRequests.newInstance(Collections.singletonList(r),
-            ReservationRequestInterpreter.R_ALL);
-    ReservationDefinition rDef =
-        ReservationDefinition.newInstance(arrival, deadline, reqs,
-            "testClientRMService#reservation", "0", priority);
-    ReservationSubmissionRequest request =
-        ReservationSubmissionRequest.newInstance(rDef,
-            reservationQ, reservationId);
+    ReservationRequest r = ReservationRequest
+        .newInstance(Resource.newInstance(1024, 1), numContainers, 1, duration);
+    ReservationRequests reqs = ReservationRequests.newInstance(
+        Collections.singletonList(r), ReservationRequestInterpreter.R_ALL);
+    ReservationDefinition rDef = ReservationDefinition.newInstance(arrival,
+        deadline, reqs, "testClientRMService#reservation", "0", priority);
+    ReservationSubmissionRequest request = ReservationSubmissionRequest
+        .newInstance(rDef, reservationQ, reservationId);
     return request;
   }
 
@@ -252,9 +260,9 @@ public class ReservationSystemTestUtil {
     return cs;
   }
 
-  @SuppressWarnings("rawtypes") public static void initializeRMContext(
-      int numContainers, AbstractYarnScheduler scheduler,
-      RMContext mockRMContext) {
+  @SuppressWarnings("rawtypes")
+  public static void initializeRMContext(int numContainers,
+      AbstractYarnScheduler scheduler, RMContext mockRMContext) {
 
     when(mockRMContext.getScheduler()).thenReturn(scheduler);
     Resource r = calculateClusterResource(numContainers);
@@ -262,26 +270,25 @@ public class ReservationSystemTestUtil {
   }
 
   public static RMContext createRMContext(Configuration conf) {
-    RMContext mockRmContext = Mockito.spy(
-        new RMContextImpl(null, null, null, null, null, null,
-            new RMContainerTokenSecretManager(conf),
-            new NMTokenSecretManagerInRM(conf),
-            new ClientToAMTokenSecretManagerInRM(), null));
+    RMContext mockRmContext = Mockito.spy(new RMContextImpl(null, null, null,
+        null, null, null, new RMContainerTokenSecretManager(conf),
+        new NMTokenSecretManagerInRM(conf),
+        new ClientToAMTokenSecretManagerInRM(), null));
 
     RMNodeLabelsManager nlm = mock(RMNodeLabelsManager.class);
     when(nlm.getQueueResource(any(String.class), anySetOf(String.class),
-            any(Resource.class))).thenAnswer(new Answer<Resource>() {
-      @Override public Resource answer(InvocationOnMock invocation)
-          throws Throwable {
-        Object[] args = invocation.getArguments();
-        return (Resource) args[2];
-      }
-    });
+        any(Resource.class))).thenAnswer(new Answer<Resource>() {
+          @Override
+          public Resource answer(InvocationOnMock invocation) throws Throwable {
+            Object[] args = invocation.getArguments();
+            return (Resource) args[2];
+          }
+        });
 
     when(nlm.getResourceByLabel(any(String.class), any(Resource.class)))
         .thenAnswer(new Answer<Resource>() {
-          @Override public Resource answer(InvocationOnMock invocation)
-              throws Throwable {
+          @Override
+          public Resource answer(InvocationOnMock invocation) throws Throwable {
             Object[] args = invocation.getArguments();
             return (Resource) args[1];
           }
@@ -304,9 +311,8 @@ public class ReservationSystemTestUtil {
     final String A = CapacitySchedulerConfiguration.ROOT + ".a";
     conf.setCapacity(A, 10);
 
-    final String dedicated =
-        CapacitySchedulerConfiguration.ROOT + CapacitySchedulerConfiguration.DOT
-            + reservationQ;
+    final String dedicated = CapacitySchedulerConfiguration.ROOT
+        + CapacitySchedulerConfiguration.DOT + reservationQ;
     conf.setCapacity(dedicated, 80);
     // Set as reservation queue
     conf.setReservable(dedicated, true);
@@ -405,26 +411,55 @@ public class ReservationSystemTestUtil {
 
   public static Map<ReservationInterval, Resource> generateAllocation(
       long startTime, long step, int[] alloc) {
+    return generateAllocation(startTime, step, alloc, null);
+  }
+
+  public static Map<ReservationInterval, Resource> generateAllocation(
+      long startTime, long step, int[] alloc, String recurrenceExpression) {
     Map<ReservationInterval, Resource> req = new TreeMap<>();
-    for (int i = 0; i < alloc.length; i++) {
-      req.put(new ReservationInterval(startTime + i * step,
-          startTime + (i + 1) * step), ReservationSystemUtil.toResource(
-          ReservationRequest
-              .newInstance(Resource.newInstance(1024, 1), alloc[i])));
+
+    long period = 0;
+    if (recurrenceExpression != null) {
+      period = Long.parseLong(recurrenceExpression);
+    }
+
+    long rStart;
+    long rEnd;
+    for (int j = 0; j < 86400000; j += period) {
+      for (int i = 0; i < alloc.length; i++) {
+        rStart = (startTime + i * step) + j * period;
+        rEnd = (startTime + (i + 1) * step) + j * period;
+        if (period > 0) {
+          rStart = rStart % period + j * period;
+          rEnd = rEnd % period + j * period;
+          if (rStart > rEnd) {
+            // skip wrap-around entry
+            continue;
+          }
+        }
+
+        req.put(new ReservationInterval(rStart, rEnd),
+            ReservationSystemUtil.toResource(ReservationRequest
+                .newInstance(Resource.newInstance(1024, 1), alloc[i])));
+
+      }
+      // execute only once if non-periodic
+      if (period == 0) {
+        break;
+      }
     }
     return req;
   }
 
-  public static RLESparseResourceAllocation
-      generateRLESparseResourceAllocation(int[] alloc, long[] timeSteps) {
+  public static RLESparseResourceAllocation generateRLESparseResourceAllocation(
+      int[] alloc, long[] timeSteps) {
     TreeMap<Long, Resource> allocationsMap = new TreeMap<>();
     for (int i = 0; i < alloc.length; i++) {
       allocationsMap.put(timeSteps[i],
           Resource.newInstance(alloc[i], alloc[i]));
     }
-    RLESparseResourceAllocation rleVector =
-        new RLESparseResourceAllocation(allocationsMap,
-            new DefaultResourceCalculator());
+    RLESparseResourceAllocation rleVector = new RLESparseResourceAllocation(
+        allocationsMap, new DefaultResourceCalculator());
     return rleVector;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message