hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From su...@apache.org
Subject [24/45] git commit: YARN-1709. In-memory data structures used to track resources over time to enable reservations.
Date Thu, 25 Sep 2014 20:52:34 GMT
YARN-1709. In-memory data structures used to track resources over time to enable reservations.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0d8b2cd8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0d8b2cd8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0d8b2cd8

Branch: refs/heads/YARN-1051
Commit: 0d8b2cd88b958b1e602fd4ea4078ef8d4742a7c3
Parents: 3f2e3b2
Author: subru <subru@outlook.com>
Authored: Fri Sep 12 17:22:08 2014 -0700
Committer: subru <subru@outlook.com>
Committed: Thu Sep 25 13:06:20 2014 -0700

----------------------------------------------------------------------
 YARN-1051-CHANGES.txt                           |   3 +
 .../reservation/InMemoryPlan.java               | 507 +++++++++++++++++++
 .../InMemoryReservationAllocation.java          | 151 ++++++
 .../resourcemanager/reservation/Plan.java       |  32 ++
 .../reservation/PlanContext.java                | 101 ++++
 .../resourcemanager/reservation/PlanEdit.java   |  61 +++
 .../resourcemanager/reservation/PlanView.java   |  89 ++++
 .../RLESparseResourceAllocation.java            | 332 ++++++++++++
 .../reservation/ReservationAllocation.java      | 104 ++++
 .../reservation/ReservationInterval.java        |  67 +++
 .../exceptions/PlanningException.java           |  25 +
 .../reservation/ReservationSystemTestUtil.java  | 210 ++++++++
 .../reservation/TestInMemoryPlan.java           | 477 +++++++++++++++++
 .../TestInMemoryReservationAllocation.java      | 206 ++++++++
 .../TestRLESparseResourceAllocation.java        | 169 +++++++
 15 files changed, 2534 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d8b2cd8/YARN-1051-CHANGES.txt
----------------------------------------------------------------------
diff --git a/YARN-1051-CHANGES.txt b/YARN-1051-CHANGES.txt
index a7c08a0..410d974 100644
--- a/YARN-1051-CHANGES.txt
+++ b/YARN-1051-CHANGES.txt
@@ -5,3 +5,6 @@ YARN-2475. Logic for responding to capacity drops for the
 ReservationSystem. (Carlo Curino and Subru Krishnan via curino)
 
 YARN-1708. Public YARN APIs for creating/updating/deleting reservations. (subru)
+
+YARN-1709. In-memory data structures used to track resources over time to
+enable reservations. (subru)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d8b2cd8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
new file mode 100644
index 0000000..99231c4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
@@ -0,0 +1,507 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.UTCClock;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class InMemoryPlan implements Plan {
+
+  private static final Logger LOG = LoggerFactory.getLogger(InMemoryPlan.class);
+
+  private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
+
+  private TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>> currentReservations =
+      new TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>>();
+
+  private RLESparseResourceAllocation rleSparseVector;
+
+  private Map<String, RLESparseResourceAllocation> userResourceAlloc =
+      new HashMap<String, RLESparseResourceAllocation>();
+
+  private Map<ReservationId, InMemoryReservationAllocation> reservationTable =
+      new HashMap<ReservationId, InMemoryReservationAllocation>();
+
+  private final ReentrantReadWriteLock readWriteLock =
+      new ReentrantReadWriteLock();
+  private final Lock readLock = readWriteLock.readLock();
+  private final Lock writeLock = readWriteLock.writeLock();
+  private final SharingPolicy policy;
+  private final ReservationAgent agent;
+  private final long step;
+  private final ResourceCalculator resCalc;
+  private final Resource minAlloc, maxAlloc;
+  private final String queueName;
+  private final QueueMetrics queueMetrics;
+  private final Planner replanner;
+  private final boolean getMoveOnExpiry;
+  private final Clock clock;
+
+  private Resource totalCapacity;
+
+  InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
+      ReservationAgent agent, Resource totalCapacity, long step,
+      ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
+      String queueName, Planner replanner, boolean getMoveOnExpiry) {
+    this(queueMetrics, policy, agent, totalCapacity, step, resCalc, minAlloc,
+        maxAlloc, queueName, replanner, getMoveOnExpiry, new UTCClock());
+  }
+
+  InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
+      ReservationAgent agent, Resource totalCapacity, long step,
+      ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
+      String queueName, Planner replanner, boolean getMoveOnExpiry, Clock clock) {
+    this.queueMetrics = queueMetrics;
+    this.policy = policy;
+    this.agent = agent;
+    this.step = step;
+    this.totalCapacity = totalCapacity;
+    this.resCalc = resCalc;
+    this.minAlloc = minAlloc;
+    this.maxAlloc = maxAlloc;
+    this.rleSparseVector = new RLESparseResourceAllocation(resCalc, minAlloc);
+    this.queueName = queueName;
+    this.replanner = replanner;
+    this.getMoveOnExpiry = getMoveOnExpiry;
+    this.clock = clock;
+  }
+
+  @Override
+  public QueueMetrics getQueueMetrics() {
+    return queueMetrics;
+  }
+
+  private void incrementAllocation(ReservationAllocation reservation) {
+    assert (readWriteLock.isWriteLockedByCurrentThread());
+    Map<ReservationInterval, ReservationRequest> allocationRequests =
+        reservation.getAllocationRequests();
+    // check if we have encountered the user earlier and if not add an entry
+    String user = reservation.getUser();
+    RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user);
+    if (resAlloc == null) {
+      resAlloc = new RLESparseResourceAllocation(resCalc, minAlloc);
+      userResourceAlloc.put(user, resAlloc);
+    }
+    for (Map.Entry<ReservationInterval, ReservationRequest> r : allocationRequests
+        .entrySet()) {
+      resAlloc.addInterval(r.getKey(), r.getValue());
+      rleSparseVector.addInterval(r.getKey(), r.getValue());
+    }
+  }
+
+  private void decrementAllocation(ReservationAllocation reservation) {
+    assert (readWriteLock.isWriteLockedByCurrentThread());
+    Map<ReservationInterval, ReservationRequest> allocationRequests =
+        reservation.getAllocationRequests();
+    String user = reservation.getUser();
+    RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user);
+    for (Map.Entry<ReservationInterval, ReservationRequest> r : allocationRequests
+        .entrySet()) {
+      resAlloc.removeInterval(r.getKey(), r.getValue());
+      rleSparseVector.removeInterval(r.getKey(), r.getValue());
+    }
+    if (resAlloc.isEmpty()) {
+      userResourceAlloc.remove(resAlloc);
+    }
+  }
+
+  public Set<ReservationAllocation> getAllReservations() {
+    readLock.lock();
+    try {
+      if (currentReservations != null) {
+        Set<ReservationAllocation> flattenedReservations =
+            new HashSet<ReservationAllocation>();
+        for (Set<InMemoryReservationAllocation> reservationEntries : currentReservations
+            .values()) {
+          flattenedReservations.addAll(reservationEntries);
+        }
+        return flattenedReservations;
+      } else {
+        return null;
+      }
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public boolean addReservation(ReservationAllocation reservation)
+      throws PlanningException {
+    // Verify the allocation is memory based otherwise it is not supported
+    InMemoryReservationAllocation inMemReservation =
+        (InMemoryReservationAllocation) reservation;
+    if (inMemReservation.getUser() == null) {
+      String errMsg =
+          "The specified Reservation with ID "
+              + inMemReservation.getReservationId()
+              + " is not mapped to any user";
+      LOG.error(errMsg);
+      throw new IllegalArgumentException(errMsg);
+    }
+    writeLock.lock();
+    try {
+      if (reservationTable.containsKey(inMemReservation.getReservationId())) {
+        String errMsg =
+            "The specified Reservation with ID "
+                + inMemReservation.getReservationId() + " already exists";
+        LOG.error(errMsg);
+        throw new IllegalArgumentException(errMsg);
+      }
+      // Validate if we can accept this reservation, throws exception if
+      // validation fails
+      policy.validate(this, inMemReservation);
+      // we record here the time in which the allocation has been accepted
+      reservation.setAcceptanceTimestamp(clock.getTime());
+      ReservationInterval searchInterval =
+          new ReservationInterval(inMemReservation.getStartTime(),
+              inMemReservation.getEndTime());
+      Set<InMemoryReservationAllocation> reservations =
+          currentReservations.get(searchInterval);
+      if (reservations == null) {
+        reservations = new HashSet<InMemoryReservationAllocation>();
+      }
+      if (!reservations.add(inMemReservation)) {
+        LOG.error("Unable to add reservation: {} to plan.",
+            inMemReservation.getReservationId());
+        return false;
+      }
+      currentReservations.put(searchInterval, reservations);
+      reservationTable.put(inMemReservation.getReservationId(),
+          inMemReservation);
+      incrementAllocation(inMemReservation);
+      LOG.info("Sucessfully added reservation: {} to plan.",
+          inMemReservation.getReservationId());
+      return true;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public boolean updateReservation(ReservationAllocation reservation)
+      throws PlanningException {
+    writeLock.lock();
+    boolean result = false;
+    try {
+      ReservationId resId = reservation.getReservationId();
+      ReservationAllocation currReservation = getReservationById(resId);
+      if (currReservation == null) {
+        String errMsg =
+            "The specified Reservation with ID " + resId
+                + " does not exist in the plan";
+        LOG.error(errMsg);
+        throw new IllegalArgumentException(errMsg);
+      }
+      if (!removeReservation(currReservation)) {
+        LOG.error("Unable to replace reservation: {} from plan.",
+            reservation.getReservationId());
+        return result;
+      }
+      try {
+        result = addReservation(reservation);
+      } catch (PlanningException e) {
+        LOG.error("Unable to update reservation: {} from plan due to {}.",
+            reservation.getReservationId(), e.getMessage());
+      }
+      if (result) {
+        LOG.info("Sucessfully updated reservation: {} in plan.",
+            reservation.getReservationId());
+        return result;
+      } else {
+        // rollback delete
+        addReservation(currReservation);
+        LOG.info("Rollbacked update reservation: {} from plan.",
+            reservation.getReservationId());
+        return result;
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  private boolean removeReservation(ReservationAllocation reservation) {
+    assert (readWriteLock.isWriteLockedByCurrentThread());
+    ReservationInterval searchInterval =
+        new ReservationInterval(reservation.getStartTime(),
+            reservation.getEndTime());
+    Set<InMemoryReservationAllocation> reservations =
+        currentReservations.get(searchInterval);
+    if (reservations != null) {
+      if (!reservations.remove(reservation)) {
+        LOG.error("Unable to remove reservation: {} from plan.",
+            reservation.getReservationId());
+        return false;
+      }
+      if (reservations.isEmpty()) {
+        currentReservations.remove(searchInterval);
+      }
+    } else {
+      String errMsg =
+          "The specified Reservation with ID " + reservation.getReservationId()
+              + " does not exist in the plan";
+      LOG.error(errMsg);
+      throw new IllegalArgumentException(errMsg);
+    }
+    reservationTable.remove(reservation.getReservationId());
+    decrementAllocation(reservation);
+    LOG.info("Sucessfully deleted reservation: {} in plan.",
+        reservation.getReservationId());
+    return true;
+  }
+
+  @Override
+  public boolean deleteReservation(ReservationId reservationID) {
+    writeLock.lock();
+    try {
+      ReservationAllocation reservation = getReservationById(reservationID);
+      if (reservation == null) {
+        String errMsg =
+            "The specified Reservation with ID " + reservationID
+                + " does not exist in the plan";
+        LOG.error(errMsg);
+        throw new IllegalArgumentException(errMsg);
+      }
+      return removeReservation(reservation);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public void archiveCompletedReservations(long tick) {
+    // Since we are looking for old reservations, read lock is optimal
+    LOG.debug("Running archival at time: {}", tick);
+    readLock.lock();
+    List<InMemoryReservationAllocation> expiredReservations =
+        new ArrayList<InMemoryReservationAllocation>();
+    // archive reservations and delete the ones which are beyond
+    // the reservation policy "window"
+    try {
+      long archivalTime = tick - policy.getValidWindow();
+      ReservationInterval searchInterval =
+          new ReservationInterval(archivalTime, archivalTime);
+      SortedMap<ReservationInterval, Set<InMemoryReservationAllocation>> reservations =
+          currentReservations.headMap(searchInterval, true);
+      if (!reservations.isEmpty()) {
+        for (Set<InMemoryReservationAllocation> reservationEntries : reservations
+            .values()) {
+          for (InMemoryReservationAllocation reservation : reservationEntries) {
+            if (reservation.getEndTime() <= archivalTime) {
+              expiredReservations.add(reservation);
+            }
+          }
+        }
+      }
+    } finally {
+      readLock.unlock();
+    }
+    if (expiredReservations.isEmpty()) {
+      return;
+    }
+    // Need write lock only if there are any reservations to be deleted
+    writeLock.lock();
+    try {
+      for (InMemoryReservationAllocation expiredReservation : expiredReservations) {
+        removeReservation(expiredReservation);
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public Set<ReservationAllocation> getReservationsAtTime(long tick) {
+    readLock.lock();
+    ReservationInterval searchInterval =
+        new ReservationInterval(tick, Long.MAX_VALUE);
+    try {
+      SortedMap<ReservationInterval, Set<InMemoryReservationAllocation>> reservations =
+          currentReservations.headMap(searchInterval, true);
+      if (!reservations.isEmpty()) {
+        Set<ReservationAllocation> flattenedReservations =
+            new HashSet<ReservationAllocation>();
+        for (Set<InMemoryReservationAllocation> reservationEntries : reservations
+            .values()) {
+          for (InMemoryReservationAllocation reservation : reservationEntries) {
+            if (reservation.getEndTime() > tick) {
+              flattenedReservations.add(reservation);
+            }
+          }
+        }
+        return Collections.unmodifiableSet(flattenedReservations);
+      } else {
+        return Collections.emptySet();
+      }
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public long getStep() {
+    return step;
+  }
+
+  @Override
+  public SharingPolicy getSharingPolicy() {
+    return policy;
+  }
+
+  @Override
+  public ReservationAgent getReservationAgent() {
+    return agent;
+  }
+
+  @Override
+  public Resource getConsumptionForUser(String user, long t) {
+    readLock.lock();
+    try {
+      RLESparseResourceAllocation userResAlloc = userResourceAlloc.get(user);
+      if (userResAlloc != null) {
+        return userResAlloc.getCapacityAtTime(t);
+      } else {
+        return Resources.clone(ZERO_RESOURCE);
+      }
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public Resource getTotalCommittedResources(long t) {
+    readLock.lock();
+    try {
+      return rleSparseVector.getCapacityAtTime(t);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public ReservationAllocation getReservationById(ReservationId reservationID) {
+    if (reservationID == null) {
+      return null;
+    }
+    readLock.lock();
+    try {
+      return reservationTable.get(reservationID);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public Resource getTotalCapacity() {
+    readLock.lock();
+    try {
+      return Resources.clone(totalCapacity);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public Resource getMinimumAllocation() {
+    return Resources.clone(minAlloc);
+  }
+
+  @Override
+  public void setTotalCapacity(Resource cap) {
+    writeLock.lock();
+    try {
+      totalCapacity = Resources.clone(cap);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  public long getEarliestStartTime() {
+    readLock.lock();
+    try {
+      return rleSparseVector.getEarliestStartTime();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public long getLastEndTime() {
+    readLock.lock();
+    try {
+      return rleSparseVector.getLatestEndTime();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public ResourceCalculator getResourceCalculator() {
+    return resCalc;
+  }
+
+  @Override
+  public String getQueueName() {
+    return queueName;
+  }
+
+  @Override
+  public Resource getMaximumAllocation() {
+    return Resources.clone(maxAlloc);
+  }
+
+  public String toCumulativeString() {
+    readLock.lock();
+    try {
+      return rleSparseVector.toString();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public Planner getReplanner() {
+    return replanner;
+  }
+
+  @Override
+  public boolean getMoveOnExpiry() {
+    return getMoveOnExpiry;
+  }
+
+  @Override
+  public String toString() {
+    readLock.lock();
+    try {
+      StringBuffer planStr = new StringBuffer("In-memory Plan: ");
+      planStr.append("Parent Queue: ").append(queueName)
+          .append("Total Capacity: ").append(totalCapacity).append("Step: ")
+          .append(step);
+      for (ReservationAllocation reservation : getAllReservations()) {
+        planStr.append(reservation);
+      }
+      return planStr.toString();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d8b2cd8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
new file mode 100644
index 0000000..10cc55f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
@@ -0,0 +1,151 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * An in memory implementation of a reservation allocation using the
+ * {@link RLESparseResourceAllocation}
+ * 
+ */
+class InMemoryReservationAllocation implements ReservationAllocation {
+
+  private final String planName;
+  private final ReservationId reservationID;
+  private final String user;
+  private final ReservationDefinition contract;
+  private final long startTime;
+  private final long endTime;
+  private final Map<ReservationInterval, ReservationRequest> allocationRequests;
+  private boolean hasGang = false;
+  private long acceptedAt = -1;
+
+  private RLESparseResourceAllocation resourcesOverTime;
+
+  InMemoryReservationAllocation(ReservationId reservationID,
+      ReservationDefinition contract, String user, String planName,
+      long startTime, long endTime,
+      Map<ReservationInterval, ReservationRequest> allocationRequests,
+      ResourceCalculator calculator, Resource minAlloc) {
+    this.contract = contract;
+    this.startTime = startTime;
+    this.endTime = endTime;
+    this.reservationID = reservationID;
+    this.user = user;
+    this.allocationRequests = allocationRequests;
+    this.planName = planName;
+    resourcesOverTime = new RLESparseResourceAllocation(calculator, minAlloc);
+    for (Map.Entry<ReservationInterval, ReservationRequest> r : allocationRequests
+        .entrySet()) {
+      resourcesOverTime.addInterval(r.getKey(), r.getValue());
+      if (r.getValue().getConcurrency() > 1) {
+        hasGang = true;
+      }
+    }
+  }
+
+  @Override
+  public ReservationId getReservationId() {
+    return reservationID;
+  }
+
+  @Override
+  public ReservationDefinition getReservationDefinition() {
+    return contract;
+  }
+
+  @Override
+  public long getStartTime() {
+    return startTime;
+  }
+
+  @Override
+  public long getEndTime() {
+    return endTime;
+  }
+
+  @Override
+  public Map<ReservationInterval, ReservationRequest> getAllocationRequests() {
+    return Collections.unmodifiableMap(allocationRequests);
+  }
+
+  @Override
+  public String getPlanName() {
+    return planName;
+  }
+
+  @Override
+  public String getUser() {
+    return user;
+  }
+
+  @Override
+  public boolean containsGangs() {
+    return hasGang;
+  }
+
+  @Override
+  public void setAcceptanceTimestamp(long acceptedAt) {
+    this.acceptedAt = acceptedAt;
+  }
+
+  @Override
+  public long getAcceptanceTime() {
+    return acceptedAt;
+  }
+
+  @Override
+  public Resource getResourcesAtTime(long tick) {
+    if (tick < startTime || tick >= endTime) {
+      return Resource.newInstance(0, 0);
+    }
+    return Resources.clone(resourcesOverTime.getCapacityAtTime(tick));
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sBuf = new StringBuilder();
+    sBuf.append(getReservationId()).append(" user:").append(getUser())
+        .append(" startTime: ").append(getStartTime()).append(" endTime: ")
+        .append(getEndTime()).append(" alloc:[")
+        .append(resourcesOverTime.toString()).append("] ");
+    return sBuf.toString();
+  }
+
+  @Override
+  public int compareTo(ReservationAllocation other) {
+    // reverse order of acceptance
+    if (this.getAcceptanceTime() > other.getAcceptanceTime()) {
+      return -1;
+    }
+    if (this.getAcceptanceTime() < other.getAcceptanceTime()) {
+      return 1;
+    }
+    return 0;
+  }
+
+  @Override
+  public int hashCode() {
+    return reservationID.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    InMemoryReservationAllocation other = (InMemoryReservationAllocation) obj;
+    return this.reservationID.equals(other.getReservationId());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d8b2cd8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java
new file mode 100644
index 0000000..cf2aed7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java
@@ -0,0 +1,32 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+/**
+ * A Plan represents the central data structure of a reservation system that
+ * maintains the "agenda" for the cluster. In particular, it maintains
+ * information on how a set of {@link ReservationDefinition} that have been
+ * previously accepted will be honored.
+ * 
+ * {@link ReservationDefinition} submitted by the users through the RM public
+ * APIs are passed to appropriate {@link ReservationAgent}s, which in turn will
+ * consult the Plan (via the {@link PlanView} interface) and try to determine
+ * whether there are sufficient resources available in this Plan to satisfy the
+ * temporal and resource constraints of a {@link ReservationDefinition}. If a
+ * valid allocation is found the agent will try to store it in the plan (via the
+ * {@link PlanEdit} interface). Upon success the system return to the user a
+ * positive acknowledgment, and a reservation identifier to be later used to
+ * access the reserved resources.
+ * 
+ * A {@link PlanFollower} will continuously read from the Plan and will
+ * affect the instantaneous allocation of resources among jobs running by
+ * publishing the "current" slice of the Plan to the underlying scheduler. I.e.,
+ * the configuration of queues/weights of the scheduler are modified to reflect
+ * the allocations in the Plan.
+ * 
+ * As this interface have several methods we decompose them into three groups:
+ * {@link PlanContext}: containing configuration type information,
+ * {@link PlanView} read-only access to the plan state, and {@link PlanEdit}
+ * write access to the plan state.
+ */
+public interface Plan extends PlanContext, PlanView, PlanEdit {
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d8b2cd8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java
new file mode 100644
index 0000000..40a25a6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java
@@ -0,0 +1,101 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+
+/**
+ * This interface provides read-only access to configuration-type parameter for
+ * a plan.
+ * 
+ */
+public interface PlanContext {
+
+  /**
+   * Returns the configured "step" or granularity of time of the plan in millis.
+   * 
+   * @return plan step in millis
+   */
+  public long getStep();
+
+  /**
+   * Return the {@link ReservationAgent} configured for this plan that is
+   * responsible for optimally placing various reservation requests
+   * 
+   * @return the {@link ReservationAgent} configured for this plan
+   */
+  public ReservationAgent getReservationAgent();
+
+  /**
+   * Return an instance of a {@link Planner}, which will be invoked in response
+   * to unexpected reduction in the resources of this plan
+   * 
+   * @return an instance of a {@link Planner}, which will be invoked in response
+   *         to unexpected reduction in the resources of this plan
+   */
+  public Planner getReplanner();
+
+  /**
+   * Return the configured {@link SharingPolicy} that governs the sharing of the
+   * resources of the plan between its various users
+   * 
+   * @return the configured {@link SharingPolicy} that governs the sharing of
+   *         the resources of the plan between its various users
+   */
+  public SharingPolicy getSharingPolicy();
+
+  /**
+   * Returns the system {@link ResourceCalculator}
+   * 
+   * @return the system {@link ResourceCalculator}
+   */
+  public ResourceCalculator getResourceCalculator();
+
+  /**
+   * Returns the single smallest {@link Resource} allocation that can be
+   * reserved in this plan
+   * 
+   * @return the single smallest {@link Resource} allocation that can be
+   *         reserved in this plan
+   */
+  public Resource getMinimumAllocation();
+
+  /**
+   * Returns the single largest {@link Resource} allocation that can be reserved
+   * in this plan
+   * 
+   * @return the single largest {@link Resource} allocation that can be reserved
+   *         in this plan
+   */
+  public Resource getMaximumAllocation();
+
+  /**
+   * Return the name of the queue in the {@link ResourceScheduler} corresponding
+   * to this plan
+   * 
+   * @return the name of the queue in the {@link ResourceScheduler}
+   *         corresponding to this plan
+   */
+  public String getQueueName();
+
+  /**
+   * Return the {@link QueueMetrics} for the queue in the
+   * {@link ResourceScheduler} corresponding to this plan
+   * 
+   * @return the {@link QueueMetrics} for the queue in the
+   *         {@link ResourceScheduler} corresponding to this plan
+   */
+  public QueueMetrics getQueueMetrics();
+
+  /**
+   * Instructs the {@link PlanFollower} on what to do for applications
+   * which are still running when the reservation is expiring (move-to-default
+   * vs kill)
+   * 
+   * @return true if remaining applications have to be killed, false if they
+   *         have to migrated
+   */
+  public boolean getMoveOnExpiry();
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d8b2cd8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java
new file mode 100644
index 0000000..648edba
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java
@@ -0,0 +1,61 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+
+/**
+ * This interface groups the methods used to modify the state of a Plan.
+ */
+public interface PlanEdit extends PlanContext, PlanView {
+
+  /**
+   * Add a new {@link ReservationAllocation} to the plan
+   * 
+   * @param reservation the {@link ReservationAllocation} to be added to the
+   *          plan
+   * @return true if addition is successful, false otherwise
+   */
+  public boolean addReservation(ReservationAllocation reservation)
+      throws PlanningException;
+
+  /**
+   * Updates an existing {@link ReservationAllocation} in the plan. This is
+   * required for re-negotiation
+   * 
+   * @param reservation the {@link ReservationAllocation} to be updated the plan
+   * @return true if update is successful, false otherwise
+   */
+  public boolean updateReservation(ReservationAllocation reservation)
+      throws PlanningException;
+
+  /**
+   * Delete an existing {@link ReservationAllocation} from the plan identified
+   * uniquely by its {@link ReservationId}. This will generally be used for
+   * garbage collection
+   * 
+   * @param reservation the {@link ReservationAllocation} to be deleted from the
+   *          plan identified uniquely by its {@link ReservationId}
+   * @return true if delete is successful, false otherwise
+   */
+  public boolean deleteReservation(ReservationId reservationID)
+      throws PlanningException;
+
+  /**
+   * Method invoked to garbage collect old reservations. It cleans up expired
+   * reservations that have fallen out of the sliding archival window
+   * 
+   * @param tick the current time from which the archival window is computed
+   */
+  public void archiveCompletedReservations(long tick) throws PlanningException;
+
+  /**
+   * Sets the overall capacity in terms of {@link Resource} assigned to this
+   * plan
+   * 
+   * @param capacity the overall capacity in terms of {@link Resource} assigned
+   *          to this plan
+   */
+  public void setTotalCapacity(Resource capacity);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d8b2cd8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
new file mode 100644
index 0000000..6e58dde
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
@@ -0,0 +1,89 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+/**
+ * This interface provides a read-only view on the allocations made in this
+ * plan. This methods are used for example by {@link ReservationAgent}s to
+ * determine the free resources in a certain point in time, and by
+ * PlanFollowerPolicy to publish this plan to the scheduler.
+ */
+public interface PlanView extends PlanContext {
+
+  /**
+   * Return a {@link ReservationAllocation} identified by its
+   * {@link ReservationId}
+   * 
+   * @param reservationID the unique id to identify the
+   *          {@link ReservationAllocation}
+   * @return {@link ReservationAllocation} identified by the specified id
+   */
+  public ReservationAllocation getReservationById(ReservationId reservationID);
+
+  /**
+   * Gets all the active reservations at the specified point of time
+   * 
+   * @param tick the time (UTC in ms) for which the active reservations are
+   *          requested
+   * @return set of active reservations at the specified time
+   */
+  public Set<ReservationAllocation> getReservationsAtTime(long tick);
+
+  /**
+   * Gets all the reservations in the plan
+   * 
+   * @return set of all reservations handled by this Plan
+   */
+  public Set<ReservationAllocation> getAllReservations();
+
+  /**
+   * Returns the total {@link Resource} reserved for all users at the specified
+   * time
+   * 
+   * @param tick the time (UTC in ms) for which the reserved resources are
+   *          requested
+   * @return the total {@link Resource} reserved for all users at the specified
+   *         time
+   */
+  public Resource getTotalCommittedResources(long tick);
+
+  /**
+   * Returns the total {@link Resource} reserved for a given user at the
+   * specified time
+   * 
+   * @param user the user who made the reservation(s)
+   * @param tick the time (UTC in ms) for which the reserved resources are
+   *          requested
+   * @return the total {@link Resource} reserved for a given user at the
+   *         specified time
+   */
+  public Resource getConsumptionForUser(String user, long tick);
+
+  /**
+   * Returns the overall capacity in terms of {@link Resource} assigned to this
+   * plan (typically will correspond to the absolute capacity of the
+   * corresponding queue).
+   * 
+   * @return the overall capacity in terms of {@link Resource} assigned to this
+   *         plan
+   */
+  public Resource getTotalCapacity();
+
+  /**
+   * Gets the time (UTC in ms) at which the first reservation starts
+   * 
+   * @return the time (UTC in ms) at which the first reservation starts
+   */
+  public long getEarliestStartTime();
+
+  /**
+   * Returns the time (UTC in ms) at which the last reservation terminates
+   * 
+   * @return the time (UTC in ms) at which the last reservation terminates
+   */
+  public long getLastEndTime();
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d8b2cd8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
new file mode 100644
index 0000000..fa8db30
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
@@ -0,0 +1,332 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import com.google.gson.stream.JsonWriter;
+
+/**
+ * This is a run length encoded sparse data structure that maintains resource
+ * allocations over time
+ */
+public class RLESparseResourceAllocation {
+
+  private static final int THRESHOLD = 100;
+  private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
+
+  private TreeMap<Long, Resource> cumulativeCapacity =
+      new TreeMap<Long, Resource>();
+
+  private final ReentrantReadWriteLock readWriteLock =
+      new ReentrantReadWriteLock();
+  private final Lock readLock = readWriteLock.readLock();
+  private final Lock writeLock = readWriteLock.writeLock();
+
+  private final ResourceCalculator resourceCalculator;
+  private final Resource minAlloc;
+
+  public RLESparseResourceAllocation(ResourceCalculator resourceCalculator,
+      Resource minAlloc) {
+    this.resourceCalculator = resourceCalculator;
+    this.minAlloc = minAlloc;
+  }
+
+  private boolean isSameAsPrevious(Long key, Resource capacity) {
+    Entry<Long, Resource> previous = cumulativeCapacity.lowerEntry(key);
+    return (previous != null && previous.getValue().equals(capacity));
+  }
+
+  private boolean isSameAsNext(Long key, Resource capacity) {
+    Entry<Long, Resource> next = cumulativeCapacity.higherEntry(key);
+    return (next != null && next.getValue().equals(capacity));
+  }
+
+  /**
+   * Add a resource for the specified interval
+   * 
+   * @param reservationInterval the interval for which the resource is to be
+   *          added
+   * @param capacity the resource to be added
+   * @return true if addition is successful, false otherwise
+   */
+  public boolean addInterval(ReservationInterval reservationInterval,
+      ReservationRequest capacity) {
+    Resource totCap =
+        Resources.multiply(capacity.getCapability(),
+            (float) capacity.getNumContainers());
+    if (totCap.equals(ZERO_RESOURCE)) {
+      return true;
+    }
+    writeLock.lock();
+    try {
+      long startKey = reservationInterval.getStartTime();
+      long endKey = reservationInterval.getEndTime();
+      NavigableMap<Long, Resource> ticks =
+          cumulativeCapacity.headMap(endKey, false);
+      if (ticks != null && !ticks.isEmpty()) {
+        Resource updatedCapacity = Resource.newInstance(0, 0);
+        Entry<Long, Resource> lowEntry = ticks.floorEntry(startKey);
+        if (lowEntry == null) {
+          // This is the earliest starting interval
+          cumulativeCapacity.put(startKey, totCap);
+        } else {
+          updatedCapacity = Resources.add(lowEntry.getValue(), totCap);
+          // Add a new tick only if the updated value is different
+          // from the previous tick
+          if ((startKey == lowEntry.getKey())
+              && (isSameAsPrevious(lowEntry.getKey(), updatedCapacity))) {
+            cumulativeCapacity.remove(lowEntry.getKey());
+          } else {
+            cumulativeCapacity.put(startKey, updatedCapacity);
+          }
+        }
+        // Increase all the capacities of overlapping intervals
+        Set<Entry<Long, Resource>> overlapSet =
+            ticks.tailMap(startKey, false).entrySet();
+        for (Entry<Long, Resource> entry : overlapSet) {
+          updatedCapacity = Resources.add(entry.getValue(), totCap);
+          entry.setValue(updatedCapacity);
+        }
+      } else {
+        // This is the first interval to be added
+        cumulativeCapacity.put(startKey, totCap);
+      }
+      Resource nextTick = cumulativeCapacity.get(endKey);
+      if (nextTick != null) {
+        // If there is overlap, remove the duplicate entry
+        if (isSameAsPrevious(endKey, nextTick)) {
+          cumulativeCapacity.remove(endKey);
+        }
+      } else {
+        // Decrease capacity as this is end of the interval
+        cumulativeCapacity.put(endKey, Resources.subtract(cumulativeCapacity
+            .floorEntry(endKey).getValue(), totCap));
+      }
+      return true;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /**
+   * Add multiple resources for the specified interval
+   * 
+   * @param reservationInterval the interval for which the resource is to be
+   *          added
+   * @param ReservationRequests the resources to be added
+   * @param clusterResource the total resources in the cluster
+   * @return true if addition is successful, false otherwise
+   */
+  public boolean addCompositeInterval(ReservationInterval reservationInterval,
+      List<ReservationRequest> ReservationRequests, Resource clusterResource) {
+    ReservationRequest aggregateReservationRequest =
+        Records.newRecord(ReservationRequest.class);
+    Resource capacity = Resource.newInstance(0, 0);
+    for (ReservationRequest ReservationRequest : ReservationRequests) {
+      Resources.addTo(capacity, Resources.multiply(
+          ReservationRequest.getCapability(),
+          ReservationRequest.getNumContainers()));
+    }
+    aggregateReservationRequest.setNumContainers((int) Math.ceil(Resources
+        .divide(resourceCalculator, clusterResource, capacity, minAlloc)));
+    aggregateReservationRequest.setCapability(minAlloc);
+
+    return addInterval(reservationInterval, aggregateReservationRequest);
+  }
+
+  /**
+   * Removes a resource for the specified interval
+   * 
+   * @param reservationInterval the interval for which the resource is to be
+   *          removed
+   * @param capacity the resource to be removed
+   * @return true if removal is successful, false otherwise
+   */
+  public boolean removeInterval(ReservationInterval reservationInterval,
+      ReservationRequest capacity) {
+    Resource totCap =
+        Resources.multiply(capacity.getCapability(),
+            (float) capacity.getNumContainers());
+    if (totCap.equals(ZERO_RESOURCE)) {
+      return true;
+    }
+    writeLock.lock();
+    try {
+      long startKey = reservationInterval.getStartTime();
+      long endKey = reservationInterval.getEndTime();
+      // update the start key
+      NavigableMap<Long, Resource> ticks =
+          cumulativeCapacity.headMap(endKey, false);
+      // Decrease all the capacities of overlapping intervals
+      SortedMap<Long, Resource> overlapSet = ticks.tailMap(startKey);
+      if (overlapSet != null && !overlapSet.isEmpty()) {
+        Resource updatedCapacity = Resource.newInstance(0, 0);
+        long currentKey = -1;
+        for (Iterator<Entry<Long, Resource>> overlapEntries =
+            overlapSet.entrySet().iterator(); overlapEntries.hasNext();) {
+          Entry<Long, Resource> entry = overlapEntries.next();
+          currentKey = entry.getKey();
+          updatedCapacity = Resources.subtract(entry.getValue(), totCap);
+          // update each entry between start and end key
+          cumulativeCapacity.put(currentKey, updatedCapacity);
+        }
+        // Remove the first overlap entry if it is same as previous after
+        // updation
+        Long firstKey = overlapSet.firstKey();
+        if (isSameAsPrevious(firstKey, overlapSet.get(firstKey))) {
+          cumulativeCapacity.remove(firstKey);
+        }
+        // Remove the next entry if it is same as end entry after updation
+        if ((currentKey != -1) && (isSameAsNext(currentKey, updatedCapacity))) {
+          cumulativeCapacity.remove(cumulativeCapacity.higherKey(currentKey));
+        }
+      }
+      return true;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /**
+   * Returns the capacity, i.e. total resources allocated at the specified point
+   * of time
+   * 
+   * @param tick the time (UTC in ms) at which the capacity is requested
+   * @return the resources allocated at the specified time
+   */
+  public Resource getCapacityAtTime(long tick) {
+    readLock.lock();
+    try {
+      Entry<Long, Resource> closestStep = cumulativeCapacity.floorEntry(tick);
+      if (closestStep != null) {
+        return Resources.clone(closestStep.getValue());
+      }
+      return Resources.clone(ZERO_RESOURCE);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * Get the timestamp of the earliest resource allocation
+   * 
+   * @return the timestamp of the first resource allocation
+   */
+  public long getEarliestStartTime() {
+    readLock.lock();
+    try {
+      if (cumulativeCapacity.isEmpty()) {
+        return -1;
+      } else {
+        return cumulativeCapacity.firstKey();
+      }
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * Get the timestamp of the latest resource allocation
+   * 
+   * @return the timestamp of the last resource allocation
+   */
+  public long getLatestEndTime() {
+    readLock.lock();
+    try {
+      if (cumulativeCapacity.isEmpty()) {
+        return -1;
+      } else {
+        return cumulativeCapacity.lastKey();
+      }
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if there are no non-zero entries
+   * 
+   * @return true if there are no allocations or false otherwise
+   */
+  public boolean isEmpty() {
+    readLock.lock();
+    try {
+      if (cumulativeCapacity.isEmpty()) {
+        return true;
+      }
+      // Deletion leaves a single zero entry so check for that
+      if (cumulativeCapacity.size() == 1) {
+        return cumulativeCapacity.firstEntry().getValue().equals(ZERO_RESOURCE);
+      }
+      return false;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder ret = new StringBuilder();
+    readLock.lock();
+    try {
+      if (cumulativeCapacity.size() > THRESHOLD) {
+        ret.append("Number of steps: ").append(cumulativeCapacity.size())
+            .append(" earliest entry: ").append(cumulativeCapacity.firstKey())
+            .append(" latest entry: ").append(cumulativeCapacity.lastKey());
+      } else {
+        for (Map.Entry<Long, Resource> r : cumulativeCapacity.entrySet()) {
+          ret.append(r.getKey()).append(": ").append(r.getValue())
+              .append("\n ");
+        }
+      }
+      return ret.toString();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * Returns the JSON string representation of the current resources allocated
+   * over time
+   * 
+   * @return the JSON string representation of the current resources allocated
+   *         over time
+   */
+  public String toMemJSONString() {
+    StringWriter json = new StringWriter();
+    JsonWriter jsonWriter = new JsonWriter(json);
+    readLock.lock();
+    try {
+      jsonWriter.beginObject();
+      // jsonWriter.name("timestamp").value("resource");
+      for (Map.Entry<Long, Resource> r : cumulativeCapacity.entrySet()) {
+        jsonWriter.name(r.getKey().toString()).value(r.getValue().toString());
+      }
+      jsonWriter.endObject();
+      jsonWriter.close();
+      return json.toString();
+    } catch (IOException e) {
+      // This should not happen
+      return "";
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d8b2cd8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
new file mode 100644
index 0000000..bca3aa8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
@@ -0,0 +1,104 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+/**
+ * A ReservationAllocation represents a concrete allocation of resources over
+ * time that satisfy a certain {@link ReservationDefinition}. This is used
+ * internally by a {@link Plan} to store information about how each of the
+ * accepted {@link ReservationDefinition} have been allocated.
+ */
+public interface ReservationAllocation extends
+    Comparable<ReservationAllocation> {
+
+  /**
+   * Returns the unique identifier {@link ReservationId} that represents the
+   * reservation
+   * 
+   * @return reservationId the unique identifier {@link ReservationId} that
+   *         represents the reservation
+   */
+  public ReservationId getReservationId();
+
+  /**
+   * Returns the original {@link ReservationDefinition} submitted by the client
+   * 
+   * @return
+   */
+  public ReservationDefinition getReservationDefinition();
+
+  /**
+   * Returns the time at which the reservation is activated
+   * 
+   * @return the time at which the reservation is activated
+   */
+  public long getStartTime();
+
+  /**
+   * Returns the time at which the reservation terminates
+   * 
+   * @return the time at which the reservation terminates
+   */
+  public long getEndTime();
+
+  /**
+   * Returns the map of resources requested against the time interval for which
+   * they were
+   * 
+   * @return the allocationRequests the map of resources requested against the
+   *         time interval for which they were
+   */
+  public Map<ReservationInterval, ReservationRequest> getAllocationRequests();
+
+  /**
+   * Return a string identifying the plan to which the reservation belongs
+   * 
+   * @return the plan to which the reservation belongs
+   */
+  public String getPlanName();
+
+  /**
+   * Returns the user who requested the reservation
+   * 
+   * @return the user who requested the reservation
+   */
+  public String getUser();
+
+  /**
+   * Returns whether the reservation has gang semantics or not
+   * 
+   * @return true if there is a gang request, false otherwise
+   */
+  public boolean containsGangs();
+
+  /**
+   * Sets the time at which the reservation was accepted by the system
+   * 
+   * @param acceptedAt the time at which the reservation was accepted by the
+   *          system
+   */
+  public void setAcceptanceTimestamp(long acceptedAt);
+
+  /**
+   * Returns the time at which the reservation was accepted by the system
+   * 
+   * @return the time at which the reservation was accepted by the system
+   */
+  public long getAcceptanceTime();
+
+  /**
+   * Returns the capacity represented by cumulative resources reserved by the
+   * reservation at the specified point of time
+   * 
+   * @param tick the time (UTC in ms) for which the reserved resources are
+   *          requested
+   * @return the resources reserved at the specified time
+   */
+  public Resource getResourcesAtTime(long tick);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d8b2cd8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInterval.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInterval.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInterval.java
new file mode 100644
index 0000000..d3a6d51
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInterval.java
@@ -0,0 +1,67 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+/**
+ * This represents the time duration of the reservation
+ * 
+ */
+public class ReservationInterval implements Comparable<ReservationInterval> {
+
+  private final long startTime;
+
+  private final long endTime;
+
+  public ReservationInterval(long startTime, long endTime) {
+    this.startTime = startTime;
+    this.endTime = endTime;
+  }
+
+  /**
+   * Get the start time of the reservation interval
+   * 
+   * @return the startTime
+   */
+  public long getStartTime() {
+    return startTime;
+  }
+
+  /**
+   * Get the end time of the reservation interval
+   * 
+   * @return the endTime
+   */
+  public long getEndTime() {
+    return endTime;
+  }
+
+  /**
+   * Returns whether the interval is active at the specified instant of time
+   * 
+   * @param tick the instance of the time to check
+   * @return true if active, false otherwise
+   */
+  public boolean isOverlap(long tick) {
+    return (startTime <= tick && tick <= endTime);
+  }
+
+  @Override
+  public int compareTo(ReservationInterval anotherInterval) {
+    long diff = 0;
+    if (startTime == anotherInterval.getStartTime()) {
+      diff = endTime - anotherInterval.getEndTime();
+    } else {
+      diff = startTime - anotherInterval.getStartTime();
+    }
+    if (diff < 0) {
+      return -1;
+    } else if (diff > 0) {
+      return 1;
+    } else {
+      return 0;
+    }
+  }
+
+  public String toString() {
+    return "[" + startTime + ", " + endTime + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d8b2cd8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java
new file mode 100644
index 0000000..aa9e9fb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java
@@ -0,0 +1,25 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
+
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+
+/**
+ * Exception thrown by the admission control subsystem when there is a problem
+ * in trying to find an allocation for a user {@link ReservationSubmissionRequest}.
+ */
+public class PlanningException extends Exception {
+
+  private static final long serialVersionUID = -684069387367879218L;
+
+  public PlanningException(String message) {
+    super(message);
+  }
+
+  public PlanningException(Throwable cause) {
+    super(cause);
+  }
+
+  public PlanningException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d8b2cd8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
new file mode 100644
index 0000000..cbca6dc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
@@ -0,0 +1,210 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.junit.Assert;
+import org.mockito.Mockito;
+
+public class ReservationSystemTestUtil {
+
+  private static Random rand = new Random();
+
+  public final static String reservationQ = "dedicated";
+
+  public static ReservationId getNewReservationId() {
+    return ReservationId.newInstance(rand.nextLong(), rand.nextLong());
+  }
+
+  public CapacityScheduler mockCapacityScheduler(int numContainers)
+      throws IOException {
+    // stolen from TestCapacityScheduler
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+
+    CapacityScheduler cs = Mockito.spy(new CapacityScheduler());
+    cs.setConf(new YarnConfiguration());
+    RMContext mockRmContext =
+        Mockito.spy(new RMContextImpl(null, null, null, null, null, null,
+            new RMContainerTokenSecretManager(conf),
+            new NMTokenSecretManagerInRM(conf),
+            new ClientToAMTokenSecretManagerInRM(), null));
+    cs.setRMContext(mockRmContext);
+    try {
+      cs.serviceInit(conf);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    when(mockRmContext.getScheduler()).thenReturn(cs);
+    Resource r = Resource.newInstance(numContainers * 1024, numContainers);
+    doReturn(r).when(cs).getClusterResource();
+    return cs;
+  }
+
+  public static void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
+    // Define default queue
+    final String defQ = CapacitySchedulerConfiguration.ROOT + ".default";
+    conf.setCapacity(defQ, 10);
+
+    // Define top-level queues
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {
+        "default", "a", reservationQ });
+
+    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+    conf.setCapacity(A, 10);
+
+    final String dedicated =
+        CapacitySchedulerConfiguration.ROOT
+            + CapacitySchedulerConfiguration.DOT + reservationQ;
+    conf.setCapacity(dedicated, 80);
+    // Set as reservation queue
+    conf.setReservableQueue(dedicated, true);
+
+    // Define 2nd-level queues
+    final String A1 = A + ".a1";
+    final String A2 = A + ".a2";
+    conf.setQueues(A, new String[] { "a1", "a2" });
+    conf.setCapacity(A1, 30);
+    conf.setCapacity(A2, 70);
+  }
+
+  public String getFullReservationQueueName() {
+    return CapacitySchedulerConfiguration.ROOT
+        + CapacitySchedulerConfiguration.DOT + reservationQ;
+  }
+
+  public String getreservationQueueName() {
+    return reservationQ;
+  }
+
+  public void updateQueueConfiguration(CapacitySchedulerConfiguration conf,
+      String newQ) {
+    // Define default queue
+    final String prefix =
+        CapacitySchedulerConfiguration.ROOT
+            + CapacitySchedulerConfiguration.DOT;
+    final String defQ = prefix + "default";
+    conf.setCapacity(defQ, 5);
+
+    // Define top-level queues
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {
+        "default", "a", reservationQ, newQ });
+
+    final String A = prefix + "a";
+    conf.setCapacity(A, 5);
+
+    final String dedicated = prefix + reservationQ;
+    conf.setCapacity(dedicated, 80);
+    // Set as reservation queue
+    conf.setReservableQueue(dedicated, true);
+
+    conf.setCapacity(prefix + newQ, 10);
+    // Set as reservation queue
+    conf.setReservableQueue(prefix + newQ, true);
+
+    // Define 2nd-level queues
+    final String A1 = A + ".a1";
+    final String A2 = A + ".a2";
+    conf.setQueues(A, new String[] { "a1", "a2" });
+    conf.setCapacity(A1, 30);
+    conf.setCapacity(A2, 70);
+  }
+
+  public static ReservationDefinition generateRandomRR(Random rand, long i) {
+    rand.setSeed(i);
+    long now = System.currentTimeMillis();
+
+    // start time at random in the next 12 hours
+    long arrival = rand.nextInt(12 * 3600 * 1000);
+    // deadline at random in the next day
+    long deadline = arrival + rand.nextInt(24 * 3600 * 1000);
+
+    // create a request with a single atomic ask
+    ReservationDefinition rr = new ReservationDefinitionPBImpl();
+    rr.setArrival(now + arrival);
+    rr.setDeadline(now + deadline);
+
+    int gang = 1 + rand.nextInt(9);
+    int par = (rand.nextInt(1000) + 1) * gang;
+    long dur = rand.nextInt(2 * 3600 * 1000); // random duration within 2h
+    ReservationRequest r =
+        ReservationRequest.newInstance(Resource.newInstance(1024, 1), par,
+            gang, dur);
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setReservationResources(Collections.singletonList(r));
+    rand.nextInt(3);
+    ReservationRequestInterpreter[] type =
+        ReservationRequestInterpreter.values();
+    reqs.setInterpreter(type[rand.nextInt(type.length)]);
+    rr.setReservationRequests(reqs);
+
+    return rr;
+
+  }
+
+  public static ReservationDefinition generateBigRR(Random rand, long i) {
+    rand.setSeed(i);
+    long now = System.currentTimeMillis();
+
+    // start time at random in the next 2 hours
+    long arrival = rand.nextInt(2 * 3600 * 1000);
+    // deadline at random in the next day
+    long deadline = rand.nextInt(24 * 3600 * 1000);
+
+    // create a request with a single atomic ask
+    ReservationDefinition rr = new ReservationDefinitionPBImpl();
+    rr.setArrival(now + arrival);
+    rr.setDeadline(now + deadline);
+
+    int gang = 1;
+    int par = 100000; // 100k tasks
+    long dur = rand.nextInt(60 * 1000); // 1min tasks
+    ReservationRequest r =
+        ReservationRequest.newInstance(Resource.newInstance(1024, 1), par,
+            gang, dur);
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setReservationResources(Collections.singletonList(r));
+    rand.nextInt(3);
+    ReservationRequestInterpreter[] type =
+        ReservationRequestInterpreter.values();
+    reqs.setInterpreter(type[rand.nextInt(type.length)]);
+    rr.setReservationRequests(reqs);
+
+    return rr;
+  }
+
+  public static Map<ReservationInterval, ReservationRequest> generateAllocation(
+      long startTime, long step, int[] alloc) {
+    Map<ReservationInterval, ReservationRequest> req =
+        new TreeMap<ReservationInterval, ReservationRequest>();
+    for (int i = 0; i < alloc.length; i++) {
+      req.put(new ReservationInterval(startTime + i * step, startTime + (i + 1)
+          * step), ReservationRequest.newInstance(
+          Resource.newInstance(1024, 1), alloc[i]));
+    }
+    return req;
+  }
+
+}


Mime
View raw message