hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jia...@apache.org
Subject [34/50] [abbrv] hadoop git commit: YARN-3656. LowCost: A Cost-Based Placement Agent for YARN Reservations. (Jonathan Yaniv and Ishai Menache via curino)
Date Tue, 28 Jul 2015 20:55:13 GMT
YARN-3656. LowCost: A Cost-Based Placement Agent for YARN Reservations. (Jonathan Yaniv and Ishai Menache via curino)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/156f24ea
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/156f24ea
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/156f24ea

Branch: refs/heads/YARN-1197
Commit: 156f24ead00436faad5d4aeef327a546392cd265
Parents: adcf5dd
Author: ccurino <ccurino@ubuntu.gateway.2wire.net>
Authored: Sat Jul 25 07:39:47 2015 -0700
Committer: ccurino <ccurino@ubuntu.gateway.2wire.net>
Committed: Sat Jul 25 07:39:47 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../reservation/AbstractReservationSystem.java  |   2 +
 .../reservation/GreedyReservationAgent.java     | 390 ---------
 .../reservation/InMemoryPlan.java               |  13 +-
 .../InMemoryReservationAllocation.java          |   8 +-
 .../resourcemanager/reservation/Plan.java       |   1 +
 .../reservation/PlanContext.java                |   2 +
 .../resourcemanager/reservation/PlanView.java   |  31 +-
 .../resourcemanager/reservation/Planner.java    |  47 --
 .../RLESparseResourceAllocation.java            |  55 +-
 .../reservation/ReservationAgent.java           |  72 --
 .../ReservationSchedulerConfiguration.java      |   6 +-
 .../reservation/ReservationSystem.java          |   5 +-
 .../reservation/ReservationSystemUtil.java      |   6 +-
 .../reservation/SimpleCapacityReplanner.java    | 113 ---
 .../planning/AlignedPlannerWithGreedy.java      | 123 +++
 .../planning/GreedyReservationAgent.java        |  97 +++
 .../reservation/planning/IterativePlanner.java  | 338 ++++++++
 .../reservation/planning/Planner.java           |  49 ++
 .../reservation/planning/PlanningAlgorithm.java | 207 +++++
 .../reservation/planning/ReservationAgent.java  |  73 ++
 .../planning/SimpleCapacityReplanner.java       | 118 +++
 .../reservation/planning/StageAllocator.java    |  55 ++
 .../planning/StageAllocatorGreedy.java          | 152 ++++
 .../planning/StageAllocatorLowCostAligned.java  | 360 ++++++++
 .../planning/StageEarliestStart.java            |  46 ++
 .../planning/StageEarliestStartByDemand.java    | 106 +++
 .../StageEarliestStartByJobArrival.java         |  39 +
 .../planning/TryManyReservationAgents.java      | 114 +++
 .../reservation/ReservationSystemTestUtil.java  |   5 +-
 .../reservation/TestCapacityOverTimePolicy.java |   2 +-
 .../TestCapacitySchedulerPlanFollower.java      |   1 +
 .../reservation/TestFairReservationSystem.java  |   1 -
 .../TestFairSchedulerPlanFollower.java          |   1 +
 .../reservation/TestGreedyReservationAgent.java | 604 --------------
 .../reservation/TestInMemoryPlan.java           |   2 +
 .../reservation/TestNoOverCommitPolicy.java     |   1 +
 .../TestRLESparseResourceAllocation.java        |  51 +-
 .../TestSchedulerPlanFollowerBase.java          |   1 +
 .../TestSimpleCapacityReplanner.java            | 162 ----
 .../planning/TestAlignedPlanner.java            | 820 +++++++++++++++++++
 .../planning/TestGreedyReservationAgent.java    | 611 ++++++++++++++
 .../planning/TestSimpleCapacityReplanner.java   | 170 ++++
 43 files changed, 3634 insertions(+), 1429 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 55258a6..883d009 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -147,6 +147,9 @@ Release 2.8.0 - UNRELEASED
     YARN-2019. Retrospect on decision of making RM crashed if any exception throw 
     in ZKRMStateStore. (Jian He via junping_du)
 
+    YARN-3656. LowCost: A Cost-Based Placement Agent for YARN Reservations. 
+    (Jonathan Yaniv and Ishai Menache via curino)
+
   IMPROVEMENTS
 
     YARN-644. Basic null check is not performed on passed in arguments before

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
index 8a15ac6..d2603c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
@@ -40,6 +40,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java
deleted file mode 100644
index 214df1c..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java
+++ /dev/null
@@ -1,390 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.api.records.ReservationRequest;
-import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This Agent employs a simple greedy placement strategy, placing the various
- * stages of a {@link ReservationRequest} from the deadline moving backward
- * towards the arrival. This allows jobs with earlier deadline to be scheduled
- * greedily as well. Combined with an opportunistic anticipation of work if the
- * cluster is not fully utilized also seems to provide good latency for
- * best-effort jobs (i.e., jobs running without a reservation).
- * 
- * This agent does not account for locality and only consider container
- * granularity for validation purposes (i.e., you can't exceed max-container
- * size).
- */
-public class GreedyReservationAgent implements ReservationAgent {
-
-  private static final Logger LOG = LoggerFactory
-      .getLogger(GreedyReservationAgent.class);
-
-  @Override
-  public boolean createReservation(ReservationId reservationId, String user,
-      Plan plan, ReservationDefinition contract) throws PlanningException {
-    return computeAllocation(reservationId, user, plan, contract, null);
-  }
-
-  @Override
-  public boolean updateReservation(ReservationId reservationId, String user,
-      Plan plan, ReservationDefinition contract) throws PlanningException {
-    return computeAllocation(reservationId, user, plan, contract,
-        plan.getReservationById(reservationId));
-  }
-
-  @Override
-  public boolean deleteReservation(ReservationId reservationId, String user,
-      Plan plan) throws PlanningException {
-    return plan.deleteReservation(reservationId);
-  }
-
-  private boolean computeAllocation(ReservationId reservationId, String user,
-      Plan plan, ReservationDefinition contract,
-      ReservationAllocation oldReservation) throws PlanningException,
-      ContractValidationException {
-    LOG.info("placing the following ReservationRequest: " + contract);
-
-    Resource totalCapacity = plan.getTotalCapacity();
-
-    // Here we can addd logic to adjust the ResourceDefinition to account for
-    // system "imperfections" (e.g., scheduling delays for large containers).
-
-    // Align with plan step conservatively (i.e., ceil arrival, and floor
-    // deadline)
-    long earliestStart = contract.getArrival();
-    long step = plan.getStep();
-    if (earliestStart % step != 0) {
-      earliestStart = earliestStart + (step - (earliestStart % step));
-    }
-    long deadline =
-        contract.getDeadline() - contract.getDeadline() % plan.getStep();
-
-    // setup temporary variables to handle time-relations between stages and
-    // intermediate answers
-    long curDeadline = deadline;
-    long oldDeadline = -1;
-
-    Map<ReservationInterval, Resource> allocations =
-        new HashMap<ReservationInterval, Resource>();
-    RLESparseResourceAllocation tempAssigned =
-        new RLESparseResourceAllocation(plan.getResourceCalculator(),
-            plan.getMinimumAllocation());
-
-    List<ReservationRequest> stages = contract.getReservationRequests()
-        .getReservationResources();
-    ReservationRequestInterpreter type = contract.getReservationRequests()
-        .getInterpreter();
-
-    boolean hasGang = false;
-
-    // Iterate the stages in backward from deadline
-    for (ListIterator<ReservationRequest> li = 
-        stages.listIterator(stages.size()); li.hasPrevious();) {
-
-      ReservationRequest currentReservationStage = li.previous();
-
-      // validate the RR respect basic constraints
-      validateInput(plan, currentReservationStage, totalCapacity);
-
-      hasGang |= currentReservationStage.getConcurrency() > 1;
-
-      // run allocation for a single stage
-      Map<ReservationInterval, Resource> curAlloc =
-          placeSingleStage(plan, tempAssigned, currentReservationStage,
-              earliestStart, curDeadline, oldReservation, totalCapacity);
-
-      if (curAlloc == null) {
-        // if we did not find an allocation for the currentReservationStage
-        // return null, unless the ReservationDefinition we are placing is of
-        // type ANY
-        if (type != ReservationRequestInterpreter.R_ANY) {
-          throw new PlanningException("The GreedyAgent"
-              + " couldn't find a valid allocation for your request");
-        } else {
-          continue;
-        }
-      } else {
-
-        // if we did find an allocation add it to the set of allocations
-        allocations.putAll(curAlloc);
-
-        // if this request is of type ANY we are done searching (greedy)
-        // and can return the current allocation (break-out of the search)
-        if (type == ReservationRequestInterpreter.R_ANY) {
-          break;
-        }
-
-        // if the request is of ORDER or ORDER_NO_GAP we constraint the next
-        // round of allocation to precede the current allocation, by setting
-        // curDeadline
-        if (type == ReservationRequestInterpreter.R_ORDER
-            || type == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
-          curDeadline = findEarliestTime(curAlloc.keySet());
-
-          // for ORDER_NO_GAP verify that the allocation found so far has no
-          // gap, return null otherwise (the greedy procedure failed to find a
-          // no-gap
-          // allocation)
-          if (type == ReservationRequestInterpreter.R_ORDER_NO_GAP
-              && oldDeadline > 0) {
-            if (oldDeadline - findLatestTime(curAlloc.keySet()) > plan
-                .getStep()) {
-              throw new PlanningException("The GreedyAgent"
-                  + " couldn't find a valid allocation for your request");
-            }
-          }
-          // keep the variable oldDeadline pointing to the last deadline we
-          // found
-          oldDeadline = curDeadline;
-        }
-      }
-    }
-
-    // / If we got here is because we failed to find an allocation for the
-    // ReservationDefinition give-up and report failure to the user
-    if (allocations.isEmpty()) {
-      throw new PlanningException("The GreedyAgent"
-          + " couldn't find a valid allocation for your request");
-    }
-
-    // create reservation with above allocations if not null/empty
-
-    Resource ZERO_RES = Resource.newInstance(0, 0);
-
-    long firstStartTime = findEarliestTime(allocations.keySet());
-    
-    // add zero-padding from arrival up to the first non-null allocation
-    // to guarantee that the reservation exists starting at arrival
-    if (firstStartTime > earliestStart) {
-      allocations.put(new ReservationInterval(earliestStart,
-          firstStartTime), ZERO_RES);
-      firstStartTime = earliestStart;
-      // consider to add trailing zeros at the end for simmetry
-    }
-
-    // Actually add/update the reservation in the plan.
-    // This is subject to validation as other agents might be placing
-    // in parallel and there might be sharing policies the agent is not
-    // aware off.
-    ReservationAllocation capReservation =
-        new InMemoryReservationAllocation(reservationId, contract, user,
-            plan.getQueueName(), firstStartTime,
-            findLatestTime(allocations.keySet()), allocations,
-            plan.getResourceCalculator(), plan.getMinimumAllocation(), hasGang);
-    if (oldReservation != null) {
-      return plan.updateReservation(capReservation);
-    } else {
-      return plan.addReservation(capReservation);
-    }
-  }
-
-  private void validateInput(Plan plan, ReservationRequest rr,
-      Resource totalCapacity) throws ContractValidationException {
-
-    if (rr.getConcurrency() < 1) {
-      throw new ContractValidationException("Gang Size should be >= 1");
-    }
-
-    if (rr.getNumContainers() <= 0) {
-      throw new ContractValidationException("Num containers should be >= 0");
-    }
-
-    // check that gangSize and numContainers are compatible
-    if (rr.getNumContainers() % rr.getConcurrency() != 0) {
-      throw new ContractValidationException(
-          "Parallelism must be an exact multiple of gang size");
-    }
-
-    // check that the largest container request does not exceed
-    // the cluster-wide limit for container sizes
-    if (Resources.greaterThan(plan.getResourceCalculator(), totalCapacity,
-        rr.getCapability(), plan.getMaximumAllocation())) {
-      throw new ContractValidationException("Individual"
-          + " capability requests should not exceed cluster's maxAlloc");
-    }
-  }
-
-  /**
-   * This method actually perform the placement of an atomic stage of the
-   * reservation. The key idea is to traverse the plan backward for a
-   * "lease-duration" worth of time, and compute what is the maximum multiple of
-   * our concurrency (gang) parameter we can fit. We do this and move towards
-   * previous instant in time until the time-window is exhausted or we placed
-   * all the user request.
-   */
-  private Map<ReservationInterval, Resource> placeSingleStage(
-      Plan plan, RLESparseResourceAllocation tempAssigned,
-      ReservationRequest rr, long earliestStart, long curDeadline,
-      ReservationAllocation oldResAllocation, final Resource totalCapacity) {
-
-    Map<ReservationInterval, Resource> allocationRequests =
-        new HashMap<ReservationInterval, Resource>();
-
-    // compute the gang as a resource and get the duration
-    Resource gang = Resources.multiply(rr.getCapability(), rr.getConcurrency());
-    long dur = rr.getDuration();
-    long step = plan.getStep();
-
-    // ceil the duration to the next multiple of the plan step
-    if (dur % step != 0) {
-      dur += (step - (dur % step));
-    }
-
-    // we know for sure that this division has no remainder (part of contract
-    // with user, validate before
-    int gangsToPlace = rr.getNumContainers() / rr.getConcurrency();
-
-    int maxGang = 0;
-
-    // loop trying to place until we are done, or we are considering
-    // an invalid range of times
-    while (gangsToPlace > 0 && curDeadline - dur >= earliestStart) {
-
-      // as we run along we remember how many gangs we can fit, and what
-      // was the most constraining moment in time (we will restart just
-      // after that to place the next batch)
-      maxGang = gangsToPlace;
-      long minPoint = curDeadline;
-      int curMaxGang = maxGang;
-
-      // start placing at deadline (excluded due to [,) interval semantics and
-      // move backward
-      for (long t = curDeadline - plan.getStep(); t >= curDeadline - dur
-          && maxGang > 0; t = t - plan.getStep()) {
-
-        // As we run along we will logically remove the previous allocation for
-        // this reservation
-        // if one existed
-        Resource oldResCap = Resource.newInstance(0, 0);
-        if (oldResAllocation != null) {
-          oldResCap = oldResAllocation.getResourcesAtTime(t);
-        }
-
-        // compute net available resources
-        Resource netAvailableRes = Resources.clone(totalCapacity);
-        Resources.addTo(netAvailableRes, oldResCap);
-        Resources.subtractFrom(netAvailableRes,
-            plan.getTotalCommittedResources(t));
-        Resources.subtractFrom(netAvailableRes,
-            tempAssigned.getCapacityAtTime(t));
-
-        // compute maximum number of gangs we could fit
-        curMaxGang =
-            (int) Math.floor(Resources.divide(plan.getResourceCalculator(),
-                totalCapacity, netAvailableRes, gang));
-
-        // pick the minimum between available resources in this instant, and how
-        // many gangs we have to place
-        curMaxGang = Math.min(gangsToPlace, curMaxGang);
-
-        // compare with previous max, and set it. also remember *where* we found
-        // the minimum (useful for next attempts)
-        if (curMaxGang <= maxGang) {
-          maxGang = curMaxGang;
-          minPoint = t;
-        }
-      }
-
-      // if we were able to place any gang, record this, and decrement
-      // gangsToPlace
-      if (maxGang > 0) {
-        gangsToPlace -= maxGang;
-
-        ReservationInterval reservationInt =
-            new ReservationInterval(curDeadline - dur, curDeadline);
-        ReservationRequest reservationRequest =
-            ReservationRequest.newInstance(rr.getCapability(),
-                rr.getConcurrency() * maxGang, rr.getConcurrency(),
-                rr.getDuration());
-        // remember occupied space (plan is read-only till we find a plausible
-        // allocation for the entire request). This is needed since we might be
-        // placing other ReservationRequest within the same
-        // ReservationDefinition,
-        // and we must avoid double-counting the available resources
-        final Resource reservationRes = ReservationSystemUtil.toResource(
-            reservationRequest);
-        tempAssigned.addInterval(reservationInt, reservationRes);
-        allocationRequests.put(reservationInt, reservationRes);
-
-      }
-
-      // reset our new starting point (curDeadline) to the most constraining
-      // point so far, we will look "left" of that to find more places where
-      // to schedule gangs (for sure nothing on the "right" of this point can
-      // fit a full gang.
-      curDeadline = minPoint;
-    }
-
-    // if no gangs are left to place we succeed and return the allocation
-    if (gangsToPlace == 0) {
-      return allocationRequests;
-    } else {
-      // If we are here is becasue we did not manage to satisfy this request.
-      // So we need to remove unwanted side-effect from tempAssigned (needed
-      // for ANY).
-      for (Map.Entry<ReservationInterval, Resource> tempAllocation :
-        allocationRequests.entrySet()) {
-        tempAssigned.removeInterval(tempAllocation.getKey(),
-            tempAllocation.getValue());
-      }
-      // and return null to signal failure in this allocation
-      return null;
-    }
-  }
-
-  // finds the leftmost point of this set of ReservationInterval
-  private long findEarliestTime(Set<ReservationInterval> resInt) {
-    long ret = Long.MAX_VALUE;
-    for (ReservationInterval s : resInt) {
-      if (s.getStartTime() < ret) {
-        ret = s.getStartTime();
-      }
-    }
-    return ret;
-  }
-
-  // finds the rightmost point of this set of ReservationIntervals
-  private long findLatestTime(Set<ReservationInterval> resInt) {
-    long ret = Long.MIN_VALUE;
-    for (ReservationInterval s : resInt) {
-      if (s.getEndTime() > ret) {
-        ret = s.getEndTime();
-      }
-    }
-    return ret;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
index 50d66cf..abc9c98 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
@@ -33,6 +33,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.UTCClock;
@@ -41,7 +43,12 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class InMemoryPlan implements Plan {
+/**
+ * This class represents an in memory representation of the state of our
+ * reservation system, and provides accelerated access to both individual
+ * reservations and aggregate utilization of resources over time.
+ */
+public class InMemoryPlan implements Plan {
 
   private static final Logger LOG = LoggerFactory.getLogger(InMemoryPlan.class);
 
@@ -75,7 +82,7 @@ class InMemoryPlan implements Plan {
 
   private Resource totalCapacity;
 
-  InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
+  public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
       ReservationAgent agent, Resource totalCapacity, long step,
       ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
       String queueName, Planner replanner, boolean getMoveOnExpiry) {
@@ -83,7 +90,7 @@ class InMemoryPlan implements Plan {
         maxAlloc, queueName, replanner, getMoveOnExpiry, new UTCClock());
   }
 
-  InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
+  public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
       ReservationAgent agent, Resource totalCapacity, long step,
       ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
       String queueName, Planner replanner, boolean getMoveOnExpiry, Clock clock) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
index a4dd23b..42a2243 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
@@ -29,9 +29,9 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 /**
  * An in memory implementation of a reservation allocation using the
  * {@link RLESparseResourceAllocation}
- * 
+ *
  */
-class InMemoryReservationAllocation implements ReservationAllocation {
+public class InMemoryReservationAllocation implements ReservationAllocation {
 
   private final String planName;
   private final ReservationId reservationID;
@@ -45,7 +45,7 @@ class InMemoryReservationAllocation implements ReservationAllocation {
 
   private RLESparseResourceAllocation resourcesOverTime;
 
-  InMemoryReservationAllocation(ReservationId reservationID,
+  public InMemoryReservationAllocation(ReservationId reservationID,
       ReservationDefinition contract, String user, String planName,
       long startTime, long endTime,
       Map<ReservationInterval, Resource> allocations,
@@ -54,7 +54,7 @@ class InMemoryReservationAllocation implements ReservationAllocation {
         allocations, calculator, minAlloc, false);
   }
 
-  InMemoryReservationAllocation(ReservationId reservationID,
+  public InMemoryReservationAllocation(ReservationId reservationID,
       ReservationDefinition contract, String user, String planName,
       long startTime, long endTime,
       Map<ReservationInterval, Resource> allocations,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java
index e8e9e29..f7ffbd0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
 import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
 
 /**
  * A Plan represents the central data structure of a reservation system that

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java
index 6d3506d..94e299e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
index b49e99e..be68906 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
@@ -1,26 +1,27 @@
-/*******************************************************************************
- *   Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *       http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- *******************************************************************************/
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
 import java.util.Set;
 
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
 
 /**
  * This interface provides a read-only view on the allocations made in this

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java
deleted file mode 100644
index 57f28ff..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-
-import java.util.List;
-
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
-
-public interface Planner {
-
-  /**
-   * Update the existing {@link Plan}, by adding/removing/updating existing
-   * reservations, and adding a subset of the reservation requests in the
-   * contracts parameter.
-   *
-   * @param plan the {@link Plan} to replan
-   * @param contracts the list of reservation requests
-   * @throws PlanningException
-   */
-  public void plan(Plan plan, List<ReservationDefinition> contracts)
-      throws PlanningException;
-
-  /**
-   * Initialize the replanner
-   *
-   * @param planQueueName the name of the queue for this plan
-   * @param conf the scheduler configuration
-   */
-  void init(String planQueueName, ReservationSchedulerConfiguration conf);
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
index 2957cc6..80f2ff7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
@@ -38,7 +38,7 @@ import com.google.gson.stream.JsonWriter;
 
 /**
  * This is a run length encoded sparse data structure that maintains resource
- * allocations over time
+ * allocations over time.
  */
 public class RLESparseResourceAllocation {
 
@@ -74,7 +74,7 @@ public class RLESparseResourceAllocation {
 
   /**
    * Add a resource for the specified interval
-   * 
+   *
    * @param reservationInterval the interval for which the resource is to be
    *          added
    * @param totCap the resource to be added
@@ -138,7 +138,7 @@ public class RLESparseResourceAllocation {
 
   /**
    * Removes a resource for the specified interval
-   * 
+   *
    * @param reservationInterval the interval for which the resource is to be
    *          removed
    * @param totCap the resource to be removed
@@ -189,7 +189,7 @@ public class RLESparseResourceAllocation {
   /**
    * Returns the capacity, i.e. total resources allocated at the specified point
    * of time
-   * 
+   *
    * @param tick the time (UTC in ms) at which the capacity is requested
    * @return the resources allocated at the specified time
    */
@@ -208,7 +208,7 @@ public class RLESparseResourceAllocation {
 
   /**
    * Get the timestamp of the earliest resource allocation
-   * 
+   *
    * @return the timestamp of the first resource allocation
    */
   public long getEarliestStartTime() {
@@ -226,7 +226,7 @@ public class RLESparseResourceAllocation {
 
   /**
    * Get the timestamp of the latest resource allocation
-   * 
+   *
    * @return the timestamp of the last resource allocation
    */
   public long getLatestEndTime() {
@@ -244,7 +244,7 @@ public class RLESparseResourceAllocation {
 
   /**
    * Returns true if there are no non-zero entries
-   * 
+   *
    * @return true if there are no allocations or false otherwise
    */
   public boolean isEmpty() {
@@ -287,7 +287,7 @@ public class RLESparseResourceAllocation {
   /**
    * Returns the JSON string representation of the current resources allocated
    * over time
-   * 
+   *
    * @return the JSON string representation of the current resources allocated
    *         over time
    */
@@ -312,4 +312,43 @@ public class RLESparseResourceAllocation {
     }
   }
 
+  /**
+   * Returns the representation of the current resources allocated over time as
+   * an interval map.
+   *
+   * @return the representation of the current resources allocated over time as
+   *         an interval map.
+   */
+  public Map<ReservationInterval, Resource> toIntervalMap() {
+
+    readLock.lock();
+    try {
+      Map<ReservationInterval, Resource> allocations =
+          new TreeMap<ReservationInterval, Resource>();
+
+      // Empty
+      if (isEmpty()) {
+        return allocations;
+      }
+
+      Map.Entry<Long, Resource> lastEntry = null;
+      for (Map.Entry<Long, Resource> entry : cumulativeCapacity.entrySet()) {
+
+        if (lastEntry != null) {
+          ReservationInterval interval =
+              new ReservationInterval(lastEntry.getKey(), entry.getKey());
+          Resource resource = lastEntry.getValue();
+
+          allocations.put(interval, resource);
+        }
+
+        lastEntry = entry;
+      }
+      return allocations;
+    } finally {
+      readLock.unlock();
+    }
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java
deleted file mode 100644
index 6955036..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*******************************************************************************
- *   Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- *******************************************************************************/
-package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
-
-/**
- * An entity that seeks to acquire resources to satisfy an user's contract
- */
-public interface ReservationAgent {
-
-  /**
-   * Create a reservation for the user that abides by the specified contract
-   * 
-   * @param reservationId the identifier of the reservation to be created.
-   * @param user the user who wants to create the reservation
-   * @param plan the Plan to which the reservation must be fitted
-   * @param contract encapsulates the resources the user requires for his
-   *          session
-   * 
-   * @return whether the create operation was successful or not
-   * @throws PlanningException if the session cannot be fitted into the plan
-   */
-  public boolean createReservation(ReservationId reservationId, String user,
-      Plan plan, ReservationDefinition contract) throws PlanningException;
-
-  /**
-   * Update a reservation for the user that abides by the specified contract
-   * 
-   * @param reservationId the identifier of the reservation to be updated
-   * @param user the user who wants to create the session
-   * @param plan the Plan to which the reservation must be fitted
-   * @param contract encapsulates the resources the user requires for his
-   *          reservation
-   * 
-   * @return whether the update operation was successful or not
-   * @throws PlanningException if the reservation cannot be fitted into the plan
-   */
-  public boolean updateReservation(ReservationId reservationId, String user,
-      Plan plan, ReservationDefinition contract) throws PlanningException;
-
-  /**
-   * Delete an user reservation
-   * 
-   * @param reservationId the identifier of the reservation to be deleted
-   * @param user the user who wants to create the reservation
-   * @param plan the Plan to which the session must be fitted
-   * 
-   * @return whether the delete operation was successful or not
-   * @throws PlanningException if the reservation cannot be fitted into the plan
-   */
-  public boolean deleteReservation(ReservationId reservationId, String user,
-      Plan plan) throws PlanningException;
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java
index 2af1ffd..c430b1f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
 
 public abstract class ReservationSchedulerConfiguration extends Configuration {
 
@@ -33,11 +35,11 @@ public abstract class ReservationSchedulerConfiguration extends Configuration {
 
   @InterfaceAudience.Private
   public static final String DEFAULT_RESERVATION_AGENT_NAME =
-      "org.apache.hadoop.yarn.server.resourcemanager.reservation.GreedyReservationAgent";
+      "org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.AlignedPlannerWithGreedy";
 
   @InterfaceAudience.Private
   public static final String DEFAULT_RESERVATION_PLANNER_NAME =
-      "org.apache.hadoop.yarn.server.resourcemanager.reservation.SimpleCapacityReplanner";
+      "org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.SimpleCapacityReplanner";
 
   @InterfaceAudience.Private
   public static final boolean DEFAULT_RESERVATION_MOVE_ON_EXPIRY = true;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
index cb76dcf..3309693 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
@@ -24,12 +24,13 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
 
 /**
  * This interface is the one implemented by any system that wants to support

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java
index 8affae4..5562adc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java
@@ -25,7 +25,11 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import java.util.HashMap;
 import java.util.Map;
 
-final class ReservationSystemUtil {
+/**
+ * Simple helper class for static methods used to transform across
+ * common formats in tests
+ */
+public final class ReservationSystemUtil {
 
   private ReservationSystemUtil() {
     // not called

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java
deleted file mode 100644
index b5a6a99..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.UTCClock;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.Resources;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * This (re)planner scan a period of time from now to a maximum time window (or
- * the end of the last session, whichever comes first) checking the overall
- * capacity is not violated.
- * 
- * It greedily removes sessions in reversed order of acceptance (latest accepted
- * is the first removed).
- */
-public class SimpleCapacityReplanner implements Planner {
-
-  private static final Log LOG = LogFactory
-      .getLog(SimpleCapacityReplanner.class);
-
-  private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
-
-  private final Clock clock;
-
-  // this allows to control to time-span of this replanning
-  // far into the future time instants might be worth replanning for
-  // later on
-  private long lengthOfCheckZone;
-
-  public SimpleCapacityReplanner() {
-    this(new UTCClock());
-  }
-
-  @VisibleForTesting
-  SimpleCapacityReplanner(Clock clock) {
-    this.clock = clock;
-  }
-
-  @Override
-  public void init(String planQueueName,
-      ReservationSchedulerConfiguration conf) {
-    this.lengthOfCheckZone = conf.getEnforcementWindow(planQueueName);
-  }
-
-  @Override
-  public void plan(Plan plan, List<ReservationDefinition> contracts)
-      throws PlanningException {
-
-    if (contracts != null) {
-      throw new RuntimeException(
-          "SimpleCapacityReplanner cannot handle new reservation contracts");
-    }
-
-    ResourceCalculator resCalc = plan.getResourceCalculator();
-    Resource totCap = plan.getTotalCapacity();
-    long now = clock.getTime();
-
-    // loop on all moment in time from now to the end of the check Zone
-    // or the end of the planned sessions whichever comes first
-    for (long t = now; (t < plan.getLastEndTime() && t < (now + lengthOfCheckZone)); t +=
-        plan.getStep()) {
-      Resource excessCap =
-          Resources.subtract(plan.getTotalCommittedResources(t), totCap);
-      // if we are violating
-      if (Resources.greaterThan(resCalc, totCap, excessCap, ZERO_RESOURCE)) {
-        // sorted on reverse order of acceptance, so newest reservations first
-        Set<ReservationAllocation> curReservations =
-            new TreeSet<ReservationAllocation>(plan.getReservationsAtTime(t));
-        for (Iterator<ReservationAllocation> resIter =
-            curReservations.iterator(); resIter.hasNext()
-            && Resources.greaterThan(resCalc, totCap, excessCap, ZERO_RESOURCE);) {
-          ReservationAllocation reservation = resIter.next();
-          plan.deleteReservation(reservation.getReservationId());
-          excessCap =
-              Resources.subtract(excessCap, reservation.getResourcesAtTime(t));
-          LOG.info("Removing reservation " + reservation.getReservationId()
-              + " to repair physical-resource constraints in the plan: "
-              + plan.getQueueName());
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java
new file mode 100644
index 0000000..a389928
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A planning algorithm that first runs LowCostAligned, and if it fails runs
+ * Greedy.
+ */
+public class AlignedPlannerWithGreedy implements ReservationAgent {
+
+  // Default smoothness factor
+  private static final int DEFAULT_SMOOTHNESS_FACTOR = 10;
+
+  // Log
+  private static final Logger LOG = LoggerFactory
+      .getLogger(AlignedPlannerWithGreedy.class);
+
+  // Smoothness factor
+  private final ReservationAgent planner;
+
+  // Constructor
+  public AlignedPlannerWithGreedy() {
+    this(DEFAULT_SMOOTHNESS_FACTOR);
+  }
+
+  // Constructor
+  public AlignedPlannerWithGreedy(int smoothnessFactor) {
+
+    // List of algorithms
+    List<ReservationAgent> listAlg = new LinkedList<ReservationAgent>();
+
+    // LowCostAligned planning algorithm
+    ReservationAgent algAligned =
+        new IterativePlanner(new StageEarliestStartByDemand(),
+            new StageAllocatorLowCostAligned(smoothnessFactor));
+    listAlg.add(algAligned);
+
+    // Greedy planning algorithm
+    ReservationAgent algGreedy =
+        new IterativePlanner(new StageEarliestStartByJobArrival(),
+            new StageAllocatorGreedy());
+    listAlg.add(algGreedy);
+
+    // Set planner:
+    // 1. Attempt to execute algAligned
+    // 2. If failed, fall back to algGreedy
+    planner = new TryManyReservationAgents(listAlg);
+
+  }
+
+  @Override
+  public boolean createReservation(ReservationId reservationId, String user,
+      Plan plan, ReservationDefinition contract) throws PlanningException {
+
+    LOG.info("placing the following ReservationRequest: " + contract);
+
+    try {
+      boolean res =
+          planner.createReservation(reservationId, user, plan, contract);
+
+      if (res) {
+        LOG.info("OUTCOME: SUCCESS, Reservation ID: "
+            + reservationId.toString() + ", Contract: " + contract.toString());
+      } else {
+        LOG.info("OUTCOME: FAILURE, Reservation ID: "
+            + reservationId.toString() + ", Contract: " + contract.toString());
+      }
+      return res;
+    } catch (PlanningException e) {
+      LOG.info("OUTCOME: FAILURE, Reservation ID: " + reservationId.toString()
+          + ", Contract: " + contract.toString());
+      throw e;
+    }
+
+  }
+
+  @Override
+  public boolean updateReservation(ReservationId reservationId, String user,
+      Plan plan, ReservationDefinition contract) throws PlanningException {
+
+    LOG.info("updating the following ReservationRequest: " + contract);
+
+    return planner.updateReservation(reservationId, user, plan, contract);
+
+  }
+
+  @Override
+  public boolean deleteReservation(ReservationId reservationId, String user,
+      Plan plan) throws PlanningException {
+
+    LOG.info("removing the following ReservationId: " + reservationId);
+
+    return planner.deleteReservation(reservationId, user, plan);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java
new file mode 100644
index 0000000..db82a66
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This Agent employs a simple greedy placement strategy, placing the various
+ * stages of a {@link ReservationDefinition} from the deadline moving backward
+ * towards the arrival. This allows jobs with earlier deadline to be scheduled
+ * greedily as well. Combined with an opportunistic anticipation of work if the
+ * cluster is not fully utilized also seems to provide good latency for
+ * best-effort jobs (i.e., jobs running without a reservation).
+ *
+ * This agent does not account for locality and only consider container
+ * granularity for validation purposes (i.e., you can't exceed max-container
+ * size).
+ */
+
+public class GreedyReservationAgent implements ReservationAgent {
+
+  // Log
+  private static final Logger LOG = LoggerFactory
+      .getLogger(GreedyReservationAgent.class);
+
+  // Greedy planner
+  private final ReservationAgent planner = new IterativePlanner(
+      new StageEarliestStartByJobArrival(), new StageAllocatorGreedy());
+
+  @Override
+  public boolean createReservation(ReservationId reservationId, String user,
+      Plan plan, ReservationDefinition contract) throws PlanningException {
+
+    LOG.info("placing the following ReservationRequest: " + contract);
+
+    try {
+      boolean res =
+          planner.createReservation(reservationId, user, plan, contract);
+
+      if (res) {
+        LOG.info("OUTCOME: SUCCESS, Reservation ID: "
+            + reservationId.toString() + ", Contract: " + contract.toString());
+      } else {
+        LOG.info("OUTCOME: FAILURE, Reservation ID: "
+            + reservationId.toString() + ", Contract: " + contract.toString());
+      }
+      return res;
+    } catch (PlanningException e) {
+      LOG.info("OUTCOME: FAILURE, Reservation ID: " + reservationId.toString()
+          + ", Contract: " + contract.toString());
+      throw e;
+    }
+
+  }
+
+  @Override
+  public boolean updateReservation(ReservationId reservationId, String user,
+      Plan plan, ReservationDefinition contract) throws PlanningException {
+
+    LOG.info("updating the following ReservationRequest: " + contract);
+
+    return planner.updateReservation(reservationId, user, plan, contract);
+
+  }
+
+  @Override
+  public boolean deleteReservation(ReservationId reservationId, String user,
+      Plan plan) throws PlanningException {
+
+    LOG.info("removing the following ReservationId: " + reservationId);
+
+    return planner.deleteReservation(reservationId, user, plan);
+
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
new file mode 100644
index 0000000..342c2e7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.HashMap;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * A planning algorithm consisting of two main phases. The algorithm iterates
+ * over the job stages in descending order. For each stage, the algorithm: 1.
+ * Determines an interval [stageArrivalTime, stageDeadline) in which the stage
+ * is allocated. 2. Computes an allocation for the stage inside the interval.
+ *
+ * For ANY and ALL jobs, phase 1 sets the allocation window of each stage to be
+ * [jobArrival, jobDeadline]. For ORDER and ORDER_NO_GAP jobs, the deadline of
+ * each stage is set as succcessorStartTime - the starting time of its
+ * succeeding stage (or jobDeadline if it is the last stage).
+ *
+ * The phases are set using the two functions: 1. setAlgEarliestStartTime 2.
+ * setAlgComputeStageAllocation
+ */
+public class IterativePlanner extends PlanningAlgorithm {
+
+  // Modifications performed by the algorithm that are not been reflected in the
+  // actual plan while a request is still pending.
+  private RLESparseResourceAllocation planModifications;
+
+  // Data extracted from plan
+  private Map<Long, Resource> planLoads;
+  private Resource capacity;
+  private long step;
+
+  // Job parameters
+  private ReservationRequestInterpreter jobType;
+  private long jobArrival;
+  private long jobDeadline;
+
+  // Phase algorithms
+  private StageEarliestStart algStageEarliestStart = null;
+  private StageAllocator algStageAllocator = null;
+
+  // Constructor
+  public IterativePlanner(StageEarliestStart algEarliestStartTime,
+      StageAllocator algStageAllocator) {
+
+    setAlgStageEarliestStart(algEarliestStartTime);
+    setAlgStageAllocator(algStageAllocator);
+
+  }
+
+  @Override
+  public RLESparseResourceAllocation computeJobAllocation(Plan plan,
+      ReservationId reservationId, ReservationDefinition reservation)
+      throws ContractValidationException {
+
+    // Initialize
+    initialize(plan, reservation);
+
+    // If the job has been previously reserved, logically remove its allocation
+    ReservationAllocation oldReservation =
+        plan.getReservationById(reservationId);
+    if (oldReservation != null) {
+      ignoreOldAllocation(oldReservation);
+    }
+
+    // Create the allocations data structure
+    RLESparseResourceAllocation allocations =
+        new RLESparseResourceAllocation(plan.getResourceCalculator(),
+            plan.getMinimumAllocation());
+
+    // Get a reverse iterator for the set of stages
+    ListIterator<ReservationRequest> li =
+        reservation
+            .getReservationRequests()
+            .getReservationResources()
+            .listIterator(
+                reservation.getReservationRequests().getReservationResources()
+                    .size());
+
+    // Current stage
+    ReservationRequest currentReservationStage;
+
+    // Index, points on the current node
+    int index =
+        reservation.getReservationRequests().getReservationResources().size();
+
+    // Stage deadlines
+    long stageDeadline = stepRoundDown(reservation.getDeadline(), step);
+    long successorStartingTime = -1;
+
+    // Iterate the stages in reverse order
+    while (li.hasPrevious()) {
+
+      // Get current stage
+      currentReservationStage = li.previous();
+      index -= 1;
+
+      // Validate that the ReservationRequest respects basic constraints
+      validateInputStage(plan, currentReservationStage);
+
+      // Compute an adjusted earliestStart for this resource
+      // (we need this to provision some space for the ORDER contracts)
+      long stageArrivalTime = reservation.getArrival();
+      if (jobType == ReservationRequestInterpreter.R_ORDER
+          || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
+        stageArrivalTime =
+            computeEarliestStartingTime(plan, reservation, index,
+                currentReservationStage, stageDeadline);
+      }
+      stageArrivalTime = stepRoundUp(stageArrivalTime, step);
+      stageArrivalTime = Math.max(stageArrivalTime, reservation.getArrival());
+
+      // Compute the allocation of a single stage
+      Map<ReservationInterval, Resource> curAlloc =
+          computeStageAllocation(plan, currentReservationStage,
+              stageArrivalTime, stageDeadline);
+
+      // If we did not find an allocation, return NULL
+      // (unless it's an ANY job, then we simply continue).
+      if (curAlloc == null) {
+
+        // If it's an ANY job, we can move to the next possible request
+        if (jobType == ReservationRequestInterpreter.R_ANY) {
+          continue;
+        }
+
+        // Otherwise, the job cannot be allocated
+        return null;
+
+      }
+
+      // Get the start & end time of the current allocation
+      Long stageStartTime = findEarliestTime(curAlloc.keySet());
+      Long stageEndTime = findLatestTime(curAlloc.keySet());
+
+      // If we did find an allocation for the stage, add it
+      for (Entry<ReservationInterval, Resource> entry : curAlloc.entrySet()) {
+        allocations.addInterval(entry.getKey(), entry.getValue());
+      }
+
+      // If this is an ANY clause, we have finished
+      if (jobType == ReservationRequestInterpreter.R_ANY) {
+        break;
+      }
+
+      // If ORDER job, set the stageDeadline of the next stage to be processed
+      if (jobType == ReservationRequestInterpreter.R_ORDER
+          || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
+
+        // Verify that there is no gap, in case the job is ORDER_NO_GAP
+        if (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP
+            && successorStartingTime != -1
+            && successorStartingTime > stageEndTime) {
+
+          return null;
+
+        }
+
+        // Store the stageStartTime and set the new stageDeadline
+        successorStartingTime = stageStartTime;
+        stageDeadline = stageStartTime;
+
+      }
+
+    }
+
+    // If the allocation is empty, return an error
+    if (allocations.isEmpty()) {
+      return null;
+    }
+
+    return allocations;
+
+  }
+
+  protected void initialize(Plan plan, ReservationDefinition reservation) {
+
+    // Get plan step & capacity
+    capacity = plan.getTotalCapacity();
+    step = plan.getStep();
+
+    // Get job parameters (type, arrival time & deadline)
+    jobType = reservation.getReservationRequests().getInterpreter();
+    jobArrival = stepRoundUp(reservation.getArrival(), step);
+    jobDeadline = stepRoundDown(reservation.getDeadline(), step);
+
+    // Dirty read of plan load
+    planLoads = getAllLoadsInInterval(plan, jobArrival, jobDeadline);
+
+    // Initialize the plan modifications
+    planModifications =
+        new RLESparseResourceAllocation(plan.getResourceCalculator(),
+            plan.getMinimumAllocation());
+
+  }
+
+  private Map<Long, Resource> getAllLoadsInInterval(Plan plan, long startTime,
+      long endTime) {
+
+    // Create map
+    Map<Long, Resource> loads = new HashMap<Long, Resource>();
+
+    // Calculate the load for every time slot between [start,end)
+    for (long t = startTime; t < endTime; t += step) {
+      Resource load = plan.getTotalCommittedResources(t);
+      loads.put(t, load);
+    }
+
+    // Return map
+    return loads;
+
+  }
+
+  private void ignoreOldAllocation(ReservationAllocation oldReservation) {
+
+    // If there is no old reservation, return
+    if (oldReservation == null) {
+      return;
+    }
+
+    // Subtract each allocation interval from the planModifications
+    for (Entry<ReservationInterval, Resource> entry : oldReservation
+        .getAllocationRequests().entrySet()) {
+
+      // Read the entry
+      ReservationInterval interval = entry.getKey();
+      Resource resource = entry.getValue();
+
+      // Find the actual request
+      Resource negativeResource = Resources.multiply(resource, -1);
+
+      // Insert it into planModifications as a 'negative' request, to
+      // represent available resources
+      planModifications.addInterval(interval, negativeResource);
+
+    }
+
+  }
+
+  private void validateInputStage(Plan plan, ReservationRequest rr)
+      throws ContractValidationException {
+
+    // Validate concurrency
+    if (rr.getConcurrency() < 1) {
+      throw new ContractValidationException("Gang Size should be >= 1");
+    }
+
+    // Validate number of containers
+    if (rr.getNumContainers() <= 0) {
+      throw new ContractValidationException("Num containers should be > 0");
+    }
+
+    // Check that gangSize and numContainers are compatible
+    if (rr.getNumContainers() % rr.getConcurrency() != 0) {
+      throw new ContractValidationException(
+          "Parallelism must be an exact multiple of gang size");
+    }
+
+    // Check that the largest container request does not exceed the cluster-wide
+    // limit for container sizes
+    if (Resources.greaterThan(plan.getResourceCalculator(), capacity,
+        rr.getCapability(), plan.getMaximumAllocation())) {
+
+      throw new ContractValidationException(
+          "Individual capability requests should not exceed cluster's " +
+          "maxAlloc");
+
+    }
+
+  }
+
+  // Call algEarliestStartTime()
+  protected long computeEarliestStartingTime(Plan plan,
+      ReservationDefinition reservation, int index,
+      ReservationRequest currentReservationStage, long stageDeadline) {
+
+    return algStageEarliestStart.setEarliestStartTime(plan, reservation, index,
+        currentReservationStage, stageDeadline);
+
+  }
+
+  // Call algStageAllocator
+  protected Map<ReservationInterval, Resource> computeStageAllocation(
+      Plan plan, ReservationRequest rr, long stageArrivalTime,
+      long stageDeadline) {
+
+    return algStageAllocator.computeStageAllocation(plan, planLoads,
+        planModifications, rr, stageArrivalTime, stageDeadline);
+
+  }
+
+  // Set the algorithm: algStageEarliestStart
+  public IterativePlanner setAlgStageEarliestStart(StageEarliestStart alg) {
+
+    this.algStageEarliestStart = alg;
+    return this; // To allow concatenation of setAlg() functions
+
+  }
+
+  // Set the algorithm: algStageAllocator
+  public IterativePlanner setAlgStageAllocator(StageAllocator alg) {
+
+    this.algStageAllocator = alg;
+    return this; // To allow concatenation of setAlg() functions
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java
new file mode 100644
index 0000000..abac6ac
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+
+public interface Planner {
+
+  /**
+   * Update the existing {@link Plan}, by adding/removing/updating existing
+   * reservations, and adding a subset of the reservation requests in the
+   * contracts parameter.
+   *
+   * @param plan the {@link Plan} to replan
+   * @param contracts the list of reservation requests
+   * @throws PlanningException
+   */
+  public void plan(Plan plan, List<ReservationDefinition> contracts)
+      throws PlanningException;
+
+  /**
+   * Initialize the replanner
+   *
+   * @param planQueueName the name of the queue for this plan
+   * @param conf the scheduler configuration
+   */
+  void init(String planQueueName, ReservationSchedulerConfiguration conf);
+}


Mime
View raw message