hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asur...@apache.org
Subject hadoop git commit: YARN-3454. Add efficient merge operation to RLESparseResourceAllocation (Carlo Curino via asuresh)
Date Sat, 21 Nov 2015 18:00:23 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk a30eccb38 -> da1016365


YARN-3454. Add efficient merge operation to RLESparseResourceAllocation (Carlo Curino via
asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/da101636
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/da101636
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/da101636

Branch: refs/heads/trunk
Commit: da1016365aba1cee9c06771ab142d077379f27af
Parents: a30eccb
Author: Arun Suresh <asuresh@apache.org>
Authored: Sat Nov 21 09:59:41 2015 -0800
Committer: Arun Suresh <asuresh@apache.org>
Committed: Sat Nov 21 09:59:41 2015 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../reservation/InMemoryPlan.java               |   6 +-
 .../InMemoryReservationAllocation.java          |   2 +-
 .../RLESparseResourceAllocation.java            | 380 ++++++++++++++-----
 .../reservation/planning/IterativePlanner.java  |   6 +-
 .../planning/StageAllocatorLowCostAligned.java  |   3 +-
 .../TestRLESparseResourceAllocation.java        | 353 ++++++++++++++++-
 7 files changed, 626 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/da101636/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 9de331c..29eca0c 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -563,6 +563,9 @@ Release 2.8.0 - UNRELEASED
     YARN-4279. Mark ApplicationId and ApplicationAttemptId static methods as @Public,
     @Unstable. (stevel)
 
+    YARN-3454. Add efficient merge operation to RLESparseResourceAllocation
+    (Carlo Curino via asuresh)
+
   OPTIMIZATIONS
 
     YARN-3339. TestDockerContainerExecutor should pull a single image and not

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da101636/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
index 7e2567b..af42df9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
@@ -108,7 +108,7 @@ public class InMemoryPlan implements Plan {
     this.resCalc = resCalc;
     this.minAlloc = minAlloc;
     this.maxAlloc = maxAlloc;
-    this.rleSparseVector = new RLESparseResourceAllocation(resCalc, minAlloc);
+    this.rleSparseVector = new RLESparseResourceAllocation(resCalc);
     this.queueName = queueName;
     this.replanner = replanner;
     this.getMoveOnExpiry = getMoveOnExpiry;
@@ -129,7 +129,7 @@ public class InMemoryPlan implements Plan {
     String user = reservation.getUser();
     RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user);
     if (resAlloc == null) {
-      resAlloc = new RLESparseResourceAllocation(resCalc, minAlloc);
+      resAlloc = new RLESparseResourceAllocation(resCalc);
       userResourceAlloc.put(user, resAlloc);
     }
     for (Map.Entry<ReservationInterval, Resource> r : allocationRequests
@@ -492,7 +492,7 @@ public class InMemoryPlan implements Plan {
   public long getLastEndTime() {
     readLock.lock();
     try {
-      return rleSparseVector.getLatestEndTime();
+      return rleSparseVector.getLatestNonNullTime();
     } finally {
       readLock.unlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da101636/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
index 42a2243..55ab066 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
@@ -67,7 +67,7 @@ public class InMemoryReservationAllocation implements ReservationAllocation
{
     this.allocationRequests = allocations;
     this.planName = planName;
     this.hasGang = hasGang;
-    resourcesOverTime = new RLESparseResourceAllocation(calculator, minAlloc);
+    resourcesOverTime = new RLESparseResourceAllocation(calculator);
     for (Map.Entry<ReservationInterval, Resource> r : allocations
         .entrySet()) {
       resourcesOverTime.addInterval(r.getKey(), r.getValue());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da101636/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
index 80f2ff7..63defb5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
@@ -24,13 +24,12 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableMap;
-import java.util.Set;
-import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -43,9 +42,9 @@ import com.google.gson.stream.JsonWriter;
 public class RLESparseResourceAllocation {
 
   private static final int THRESHOLD = 100;
-  private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
+  private static final Resource ZERO_RESOURCE = Resources.none();
 
-  private TreeMap<Long, Resource> cumulativeCapacity =
+  private NavigableMap<Long, Resource> cumulativeCapacity =
       new TreeMap<Long, Resource>();
 
   private final ReentrantReadWriteLock readWriteLock =
@@ -54,26 +53,20 @@ public class RLESparseResourceAllocation {
   private final Lock writeLock = readWriteLock.writeLock();
 
   private final ResourceCalculator resourceCalculator;
-  private final Resource minAlloc;
 
-  public RLESparseResourceAllocation(ResourceCalculator resourceCalculator,
-      Resource minAlloc) {
+  public RLESparseResourceAllocation(ResourceCalculator resourceCalculator) {
     this.resourceCalculator = resourceCalculator;
-    this.minAlloc = minAlloc;
   }
 
-  private boolean isSameAsPrevious(Long key, Resource capacity) {
-    Entry<Long, Resource> previous = cumulativeCapacity.lowerEntry(key);
-    return (previous != null && previous.getValue().equals(capacity));
-  }
-
-  private boolean isSameAsNext(Long key, Resource capacity) {
-    Entry<Long, Resource> next = cumulativeCapacity.higherEntry(key);
-    return (next != null && next.getValue().equals(capacity));
+  public RLESparseResourceAllocation(NavigableMap<Long, Resource> out,
+      ResourceCalculator resourceCalculator) {
+    // miss check for repeated entries
+    this.cumulativeCapacity = out;
+    this.resourceCalculator = resourceCalculator;
   }
 
   /**
-   * Add a resource for the specified interval
+   * Add a resource for the specified interval.
    *
    * @param reservationInterval the interval for which the resource is to be
    *          added
@@ -87,48 +80,15 @@ public class RLESparseResourceAllocation {
     }
     writeLock.lock();
     try {
-      long startKey = reservationInterval.getStartTime();
-      long endKey = reservationInterval.getEndTime();
-      NavigableMap<Long, Resource> ticks =
-          cumulativeCapacity.headMap(endKey, false);
-      if (ticks != null && !ticks.isEmpty()) {
-        Resource updatedCapacity = Resource.newInstance(0, 0);
-        Entry<Long, Resource> lowEntry = ticks.floorEntry(startKey);
-        if (lowEntry == null) {
-          // This is the earliest starting interval
-          cumulativeCapacity.put(startKey, totCap);
-        } else {
-          updatedCapacity = Resources.add(lowEntry.getValue(), totCap);
-          // Add a new tick only if the updated value is different
-          // from the previous tick
-          if ((startKey == lowEntry.getKey())
-              && (isSameAsPrevious(lowEntry.getKey(), updatedCapacity))) {
-            cumulativeCapacity.remove(lowEntry.getKey());
-          } else {
-            cumulativeCapacity.put(startKey, updatedCapacity);
-          }
-        }
-        // Increase all the capacities of overlapping intervals
-        Set<Entry<Long, Resource>> overlapSet =
-            ticks.tailMap(startKey, false).entrySet();
-        for (Entry<Long, Resource> entry : overlapSet) {
-          updatedCapacity = Resources.add(entry.getValue(), totCap);
-          entry.setValue(updatedCapacity);
-        }
-      } else {
-        // This is the first interval to be added
-        cumulativeCapacity.put(startKey, totCap);
-      }
-      Resource nextTick = cumulativeCapacity.get(endKey);
-      if (nextTick != null) {
-        // If there is overlap, remove the duplicate entry
-        if (isSameAsPrevious(endKey, nextTick)) {
-          cumulativeCapacity.remove(endKey);
-        }
-      } else {
-        // Decrease capacity as this is end of the interval
-        cumulativeCapacity.put(endKey, Resources.subtract(cumulativeCapacity
-            .floorEntry(endKey).getValue(), totCap));
+      NavigableMap<Long, Resource> addInt = new TreeMap<Long, Resource>();
+      addInt.put(reservationInterval.getStartTime(), totCap);
+      addInt.put(reservationInterval.getEndTime(), ZERO_RESOURCE);
+      try {
+        cumulativeCapacity =
+            merge(resourceCalculator, totCap, cumulativeCapacity, addInt,
+                Long.MIN_VALUE, Long.MAX_VALUE, RLEOperator.add);
+      } catch (PlanningException e) {
+        // never happens for add
       }
       return true;
     } finally {
@@ -137,7 +97,7 @@ public class RLESparseResourceAllocation {
   }
 
   /**
-   * Removes a resource for the specified interval
+   * Removes a resource for the specified interval.
    *
    * @param reservationInterval the interval for which the resource is to be
    *          removed
@@ -151,34 +111,16 @@ public class RLESparseResourceAllocation {
     }
     writeLock.lock();
     try {
-      long startKey = reservationInterval.getStartTime();
-      long endKey = reservationInterval.getEndTime();
-      // update the start key
-      NavigableMap<Long, Resource> ticks =
-          cumulativeCapacity.headMap(endKey, false);
-      // Decrease all the capacities of overlapping intervals
-      SortedMap<Long, Resource> overlapSet = ticks.tailMap(startKey);
-      if (overlapSet != null && !overlapSet.isEmpty()) {
-        Resource updatedCapacity = Resource.newInstance(0, 0);
-        long currentKey = -1;
-        for (Iterator<Entry<Long, Resource>> overlapEntries =
-            overlapSet.entrySet().iterator(); overlapEntries.hasNext();) {
-          Entry<Long, Resource> entry = overlapEntries.next();
-          currentKey = entry.getKey();
-          updatedCapacity = Resources.subtract(entry.getValue(), totCap);
-          // update each entry between start and end key
-          cumulativeCapacity.put(currentKey, updatedCapacity);
-        }
-        // Remove the first overlap entry if it is same as previous after
-        // updation
-        Long firstKey = overlapSet.firstKey();
-        if (isSameAsPrevious(firstKey, overlapSet.get(firstKey))) {
-          cumulativeCapacity.remove(firstKey);
-        }
-        // Remove the next entry if it is same as end entry after updation
-        if ((currentKey != -1) && (isSameAsNext(currentKey, updatedCapacity))) {
-          cumulativeCapacity.remove(cumulativeCapacity.higherKey(currentKey));
-        }
+
+      NavigableMap<Long, Resource> removeInt = new TreeMap<Long, Resource>();
+      removeInt.put(reservationInterval.getStartTime(), totCap);
+      removeInt.put(reservationInterval.getEndTime(), ZERO_RESOURCE);
+      try {
+        cumulativeCapacity =
+            merge(resourceCalculator, totCap, cumulativeCapacity, removeInt,
+                Long.MIN_VALUE, Long.MAX_VALUE, RLEOperator.subtract);
+      } catch (PlanningException e) {
+        // never happens for subtract
       }
       return true;
     } finally {
@@ -188,9 +130,8 @@ public class RLESparseResourceAllocation {
 
   /**
    * Returns the capacity, i.e. total resources allocated at the specified point
-   * of time
+   * of time.
    *
-   * @param tick the time (UTC in ms) at which the capacity is requested
    * @return the resources allocated at the specified time
    */
   public Resource getCapacityAtTime(long tick) {
@@ -207,7 +148,7 @@ public class RLESparseResourceAllocation {
   }
 
   /**
-   * Get the timestamp of the earliest resource allocation
+   * Get the timestamp of the earliest resource allocation.
    *
    * @return the timestamp of the first resource allocation
    */
@@ -225,17 +166,24 @@ public class RLESparseResourceAllocation {
   }
 
   /**
-   * Get the timestamp of the latest resource allocation
+   * Get the timestamp of the latest non-null resource allocation.
    *
    * @return the timestamp of the last resource allocation
    */
-  public long getLatestEndTime() {
+  public long getLatestNonNullTime() {
     readLock.lock();
     try {
       if (cumulativeCapacity.isEmpty()) {
         return -1;
       } else {
-        return cumulativeCapacity.lastKey();
+        // the last entry might contain null (to terminate
+        // the sequence)... return previous one.
+        Entry<Long, Resource> last = cumulativeCapacity.lastEntry();
+        if (last.getValue() == null) {
+          return cumulativeCapacity.floorKey(last.getKey() - 1);
+        } else {
+          return last.getKey();
+        }
       }
     } finally {
       readLock.unlock();
@@ -243,7 +191,7 @@ public class RLESparseResourceAllocation {
   }
 
   /**
-   * Returns true if there are no non-zero entries
+   * Returns true if there are no non-zero entries.
    *
    * @return true if there are no allocations or false otherwise
    */
@@ -253,9 +201,11 @@ public class RLESparseResourceAllocation {
       if (cumulativeCapacity.isEmpty()) {
         return true;
       }
-      // Deletion leaves a single zero entry so check for that
-      if (cumulativeCapacity.size() == 1) {
-        return cumulativeCapacity.firstEntry().getValue().equals(ZERO_RESOURCE);
+      // Deletion leaves a single zero entry with a null at the end so check for
+      // that
+      if (cumulativeCapacity.size() == 2) {
+        return cumulativeCapacity.firstEntry().getValue().equals(ZERO_RESOURCE)
+            && cumulativeCapacity.lastEntry().getValue() == null;
       }
       return false;
     } finally {
@@ -286,7 +236,7 @@ public class RLESparseResourceAllocation {
 
   /**
    * Returns the JSON string representation of the current resources allocated
-   * over time
+   * over time.
    *
    * @return the JSON string representation of the current resources allocated
    *         over time
@@ -314,7 +264,7 @@ public class RLESparseResourceAllocation {
 
   /**
    * Returns the representation of the current resources allocated over time as
-   * an interval map.
+   * an interval map (in the defined non-null range).
    *
    * @return the representation of the current resources allocated over time as
    *         an interval map.
@@ -334,7 +284,7 @@ public class RLESparseResourceAllocation {
       Map.Entry<Long, Resource> lastEntry = null;
       for (Map.Entry<Long, Resource> entry : cumulativeCapacity.entrySet()) {
 
-        if (lastEntry != null) {
+        if (lastEntry != null && entry.getValue() != null) {
           ReservationInterval interval =
               new ReservationInterval(lastEntry.getKey(), entry.getKey());
           Resource resource = lastEntry.getValue();
@@ -348,7 +298,235 @@ public class RLESparseResourceAllocation {
     } finally {
       readLock.unlock();
     }
+  }
 
+  public NavigableMap<Long, Resource> getCumulative() {
+    readLock.lock();
+    try {
+      return cumulativeCapacity;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * Merges the range start to end of two {@code RLESparseResourceAllocation}
+   * using a given {@code RLEOperator}.
+   *
+   * @param resCalc the resource calculator
+   * @param clusterResource the total cluster resources (for DRF)
+   * @param a the left operand
+   * @param b the right operand
+   * @param operator the operator to be applied during merge
+   * @param start the start-time of the range to be considered
+   * @param end the end-time of the range to be considered
+   * @return the a merged RLESparseResourceAllocation, produced by applying
+   *         "operator" to "a" and "b"
+   * @throws PlanningException in case the operator is subtractTestPositive and
+   *           the result would contain a negative value
+   */
+  public static RLESparseResourceAllocation merge(ResourceCalculator resCalc,
+      Resource clusterResource, RLESparseResourceAllocation a,
+      RLESparseResourceAllocation b, RLEOperator operator, long start, long end)
+      throws PlanningException {
+    NavigableMap<Long, Resource> cumA =
+        a.getRangeOverlapping(start, end).getCumulative();
+    NavigableMap<Long, Resource> cumB =
+        b.getRangeOverlapping(start, end).getCumulative();
+    NavigableMap<Long, Resource> out =
+        merge(resCalc, clusterResource, cumA, cumB, start, end, operator);
+    return new RLESparseResourceAllocation(out, resCalc);
+  }
+
+  private static NavigableMap<Long, Resource> merge(ResourceCalculator resCalc,
+      Resource clusterResource, NavigableMap<Long, Resource> a,
+      NavigableMap<Long, Resource> b, long start, long end,
+      RLEOperator operator) throws PlanningException {
+
+    // handle special cases of empty input
+    if (a == null || a.isEmpty()) {
+      if (operator == RLEOperator.subtract
+          || operator == RLEOperator.subtractTestNonNegative) {
+        return negate(operator, b);
+      } else {
+        return b;
+      }
+    }
+    if (b == null || b.isEmpty()) {
+      return a;
+    }
+
+    // define iterators and support variables
+    Iterator<Entry<Long, Resource>> aIt = a.entrySet().iterator();
+    Iterator<Entry<Long, Resource>> bIt = b.entrySet().iterator();
+    Entry<Long, Resource> curA = aIt.next();
+    Entry<Long, Resource> curB = bIt.next();
+    Entry<Long, Resource> lastA = null;
+    Entry<Long, Resource> lastB = null;
+    boolean aIsDone = false;
+    boolean bIsDone = false;
+
+    TreeMap<Long, Resource> out = new TreeMap<Long, Resource>();
+
+    while (!(curA.equals(lastA) && curB.equals(lastB))) {
+
+      Resource outRes;
+      long time = -1;
+
+      // curA is smaller than curB
+      if (bIsDone || (curA.getKey() < curB.getKey() && !aIsDone)) {
+        outRes = combineValue(operator, resCalc, clusterResource, curA, lastB);
+        time = (curA.getKey() < start) ? start : curA.getKey();
+        lastA = curA;
+        if (aIt.hasNext()) {
+          curA = aIt.next();
+        } else {
+          aIsDone = true;
+        }
+
+      } else {
+        // curB is smaller than curA
+        if (aIsDone || (curA.getKey() > curB.getKey() && !bIsDone)) {
+          outRes =
+              combineValue(operator, resCalc, clusterResource, lastA, curB);
+          time = (curB.getKey() < start) ? start : curB.getKey();
+          lastB = curB;
+          if (bIt.hasNext()) {
+            curB = bIt.next();
+          } else {
+            bIsDone = true;
+          }
+
+        } else {
+          // curA is equal to curB
+          outRes = combineValue(operator, resCalc, clusterResource, curA, curB);
+          time = (curA.getKey() < start) ? start : curA.getKey();
+          lastA = curA;
+          if (aIt.hasNext()) {
+            curA = aIt.next();
+          } else {
+            aIsDone = true;
+          }
+          lastB = curB;
+          if (bIt.hasNext()) {
+            curB = bIt.next();
+          } else {
+            bIsDone = true;
+          }
+        }
+      }
+
+      // add to out if not redundant
+      addIfNeeded(out, time, outRes);
+    }
+    addIfNeeded(out, end, null);
+
+    return out;
+  }
+
+  private static NavigableMap<Long, Resource> negate(RLEOperator operator,
+      NavigableMap<Long, Resource> a) throws PlanningException {
+
+    TreeMap<Long, Resource> out = new TreeMap<Long, Resource>();
+    for (Entry<Long, Resource> e : a.entrySet()) {
+      Resource val = Resources.negate(e.getValue());
+      // test for negative value and throws
+      if (operator == RLEOperator.subtractTestNonNegative
+          && (Resources.fitsIn(val, ZERO_RESOURCE) &&
+              !Resources.equals(val, ZERO_RESOURCE))) {
+        throw new PlanningException(
+            "RLESparseResourceAllocation: merge failed as the "
+                + "resulting RLESparseResourceAllocation would be negative");
+      }
+      out.put(e.getKey(), val);
+    }
+
+    return out;
+  }
+
+  private static void addIfNeeded(TreeMap<Long, Resource> out, long time,
+      Resource outRes) {
+
+    if (out.isEmpty() || (out.lastEntry() != null && outRes == null)
+        || !Resources.equals(out.lastEntry().getValue(), outRes)) {
+      out.put(time, outRes);
+    }
+
+  }
+
+  private static Resource combineValue(RLEOperator op,
+      ResourceCalculator resCalc, Resource clusterResource,
+      Entry<Long, Resource> eA, Entry<Long, Resource> eB)
+      throws PlanningException {
+
+    // deal with nulls
+    if (eA == null || eA.getValue() == null) {
+      if (eB == null || eB.getValue() == null) {
+        return null;
+      }
+      if (op == RLEOperator.subtract) {
+        return Resources.negate(eB.getValue());
+      } else {
+        return eB.getValue();
+      }
+    }
+    if (eB == null || eB.getValue() == null) {
+      return eA.getValue();
+    }
+
+    Resource a = eA.getValue();
+    Resource b = eB.getValue();
+    switch (op) {
+    case add:
+      return Resources.add(a, b);
+    case subtract:
+      return Resources.subtract(a, b);
+    case subtractTestNonNegative:
+      if (!Resources.fitsIn(b, a)) {
+        throw new PlanningException(
+            "RLESparseResourceAllocation: merge failed as the "
+                + "resulting RLESparseResourceAllocation would be negative");
+      } else {
+        return Resources.subtract(a, b);
+      }
+    case min:
+      return Resources.min(resCalc, clusterResource, a, b);
+    case max:
+      return Resources.max(resCalc, clusterResource, a, b);
+    default:
+      return null;
+    }
+
+  }
+
+  public RLESparseResourceAllocation getRangeOverlapping(long start, long end) {
+    readLock.lock();
+    try {
+      NavigableMap<Long, Resource> a = this.getCumulative();
+
+      if (a != null && !a.isEmpty()) {
+        // include the portion of previous entry that overlaps start
+        if (start > a.firstKey()) {
+          long previous = a.floorKey(start);
+          a = a.tailMap(previous, true);
+        }
+        a = a.headMap(end, true);
+      }
+      RLESparseResourceAllocation ret =
+          new RLESparseResourceAllocation(a, resourceCalculator);
+      return ret;
+    } finally {
+      readLock.unlock();
+    }
+
+  }
+
+  /**
+   * The set of operators that can be applied to two
+   * {@code RLESparseResourceAllocation} during a merge operation.
+   */
+  public enum RLEOperator {
+    add, subtract, min, max, subtractTestNonNegative
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da101636/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
index 342c2e7..d05b0ef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
@@ -95,8 +95,7 @@ public class IterativePlanner extends PlanningAlgorithm {
 
     // Create the allocations data structure
     RLESparseResourceAllocation allocations =
-        new RLESparseResourceAllocation(plan.getResourceCalculator(),
-            plan.getMinimumAllocation());
+        new RLESparseResourceAllocation(plan.getResourceCalculator());
 
     // Get a reverse iterator for the set of stages
     ListIterator<ReservationRequest> li =
@@ -219,8 +218,7 @@ public class IterativePlanner extends PlanningAlgorithm {
 
     // Initialize the plan modifications
     planModifications =
-        new RLESparseResourceAllocation(plan.getResourceCalculator(),
-            plan.getMinimumAllocation());
+        new RLESparseResourceAllocation(plan.getResourceCalculator());
 
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da101636/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
index 4b5763d..04cce7b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
@@ -69,8 +69,7 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
 
     // Create allocationRequestsearlies
     RLESparseResourceAllocation allocationRequests =
-        new RLESparseResourceAllocation(plan.getResourceCalculator(),
-            plan.getMinimumAllocation());
+        new RLESparseResourceAllocation(plan.getResourceCalculator());
 
     // Initialize parameters
     long duration = stepRoundUp(rr.getDuration(), step);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da101636/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
index f0cc49c..85fafa7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
@@ -17,17 +17,25 @@
  *******************************************************************************/
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
+import java.util.TreeMap;
 
 import org.apache.hadoop.yarn.api.records.ReservationRequest;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,12 +46,248 @@ public class TestRLESparseResourceAllocation {
       .getLogger(TestRLESparseResourceAllocation.class);
 
   @Test
+  public void testMergeAdd() throws PlanningException {
+
+    TreeMap<Long, Resource> a = new TreeMap<>();
+    TreeMap<Long, Resource> b = new TreeMap<>();
+
+    setupArrays(a, b);
+
+    RLESparseResourceAllocation rleA =
+        new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
+    RLESparseResourceAllocation rleB =
+        new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
+
+    RLESparseResourceAllocation out =
+        RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
+            Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
+            RLEOperator.add, 18, 45);
+
+    System.out.println(out);
+
+    long[] time = { 18, 20, 22, 30, 33, 40, 43, 45 };
+    int[] alloc = { 10, 15, 20, 25, 30, 40, 30 };
+
+    validate(out, time, alloc);
+  }
+
+  @Test
+  public void testMergeMin() throws PlanningException {
+
+    TreeMap<Long, Resource> a = new TreeMap<>();
+    TreeMap<Long, Resource> b = new TreeMap<>();
+
+    setupArrays(a, b);
+
+    RLESparseResourceAllocation rleA =
+        new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
+    RLESparseResourceAllocation rleB =
+        new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
+
+    RLESparseResourceAllocation out =
+        RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
+            Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
+            RLEOperator.min, 0, 60);
+
+    System.out.println(out);
+
+    long[] time = { 10, 22, 33, 40, 43, 50, 60 };
+    int[] alloc = { 5, 10, 15, 20, 10, 0 };
+
+    validate(out, time, alloc);
+
+  }
+
+  @Test
+  public void testMergeMax() throws PlanningException {
+
+    TreeMap<Long, Resource> a = new TreeMap<>();
+    TreeMap<Long, Resource> b = new TreeMap<>();
+
+    setupArrays(a, b);
+
+    RLESparseResourceAllocation rleA =
+        new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
+    RLESparseResourceAllocation rleB =
+        new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
+
+    RLESparseResourceAllocation out =
+        RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
+            Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
+            RLEOperator.max, 0, 60);
+
+    System.out.println(out);
+
+    long[] time = { 10, 20, 30, 40, 50, 60 };
+    int[] alloc = { 5, 10, 15, 20, 10 };
+
+    validate(out, time, alloc);
+
+  }
+
+  @Test
+  public void testMergeSubtract() throws PlanningException {
+
+    TreeMap<Long, Resource> a = new TreeMap<>();
+    TreeMap<Long, Resource> b = new TreeMap<>();
+
+    setupArrays(a, b);
+
+    RLESparseResourceAllocation rleA =
+        new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
+    RLESparseResourceAllocation rleB =
+        new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
+
+    RLESparseResourceAllocation out =
+        RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
+            Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
+            RLEOperator.subtract, 0, 60);
+
+    System.out.println(out);
+
+    long[] time = { 10, 11, 20, 22, 30, 33, 43, 50, 60 };
+    int[] alloc = { 5, 0, 5, 0, 5, 0, 10, -10 };
+
+    validate(out, time, alloc);
+
+  }
+
+  @Test
+  public void testMergesubtractTestNonNegative() throws PlanningException {
+
+    // starting with default array example
+    TreeMap<Long, Resource> a = new TreeMap<>();
+    TreeMap<Long, Resource> b = new TreeMap<>();
+
+    setupArrays(a, b);
+
+    RLESparseResourceAllocation rleA =
+        new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
+    RLESparseResourceAllocation rleB =
+        new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
+
+    try {
+      RLESparseResourceAllocation out =
+          RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
+              Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
+              RLEOperator.subtractTestNonNegative, 0, 60);
+      fail();
+    } catch (PlanningException pe) {
+      // Expected!
+    }
+
+    // NOTE a is empty!! so the subtraction is implicitly considered negative
+    // and the test should fail
+
+    a = new TreeMap<>();
+    b = new TreeMap<>();
+    b.put(11L, Resource.newInstance(5, 6));
+
+    rleA = new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
+    rleB = new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
+
+    try {
+      RLESparseResourceAllocation out =
+          RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
+              Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
+              RLEOperator.subtractTestNonNegative, 0, 60);
+      fail();
+    } catch (PlanningException pe) {
+      // Expected!
+    }
+
+    // Testing that the subtractTestNonNegative detects problems even if only one
+    // of the resource dimensions is "<0"
+    a.put(10L, Resource.newInstance(10, 5));
+    b.put(11L, Resource.newInstance(5, 6));
+
+    rleA = new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
+    rleB = new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
+
+    try {
+      RLESparseResourceAllocation out =
+          RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
+              Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
+              RLEOperator.subtractTestNonNegative, 0, 60);
+      fail();
+    } catch (PlanningException pe) {
+      // Expected!
+    }
+
+    // try with reverse setting
+    a.put(10L, Resource.newInstance(5, 10));
+    b.put(11L, Resource.newInstance(6, 5));
+
+    rleA = new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
+    rleB = new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
+
+    try {
+      RLESparseResourceAllocation out =
+          RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
+              Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
+              RLEOperator.subtractTestNonNegative, 0, 60);
+      fail();
+    } catch (PlanningException pe) {
+      // Expected!
+    }
+
+    // trying a case that should work
+    a.put(10L, Resource.newInstance(10, 6));
+    b.put(11L, Resource.newInstance(5, 6));
+
+    rleA = new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
+    rleB = new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
+
+    RLESparseResourceAllocation out =
+        RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
+            Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
+            RLEOperator.subtractTestNonNegative, 0, 60);
+
+  }
+
+  @Test
+  @Ignore
+  public void testMergeSpeed() throws PlanningException {
+
+    for (int j = 0; j < 100; j++) {
+      TreeMap<Long, Resource> a = new TreeMap<>();
+      TreeMap<Long, Resource> b = new TreeMap<>();
+      Random rand = new Random();
+      long startA = 0;
+      long startB = 0;
+
+      for (int i = 0; i < 1000 + rand.nextInt(9000); i++) {
+        startA += rand.nextInt(100);
+        startB += rand.nextInt(100);
+        a.put(startA,
+            Resource.newInstance(rand.nextInt(10240), rand.nextInt(10)));
+        b.put(startB,
+            Resource.newInstance(rand.nextInt(10240), rand.nextInt(10)));
+      }
+
+      RLESparseResourceAllocation rleA =
+          new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
+      RLESparseResourceAllocation rleB =
+          new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
+
+      long start = System.currentTimeMillis();
+      RLESparseResourceAllocation out =
+          RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
+              Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
+              RLEOperator.add, Long.MIN_VALUE, Long.MAX_VALUE);
+      long end = System.currentTimeMillis();
+
+      System.out.println(" Took: " + (end - start) + "ms ");
+    }
+
+  }
+
+  @Test
   public void testBlocks() {
     ResourceCalculator resCalc = new DefaultResourceCalculator();
-    Resource minAlloc = Resource.newInstance(1, 1);
 
     RLESparseResourceAllocation rleSparseVector =
-        new RLESparseResourceAllocation(resCalc, minAlloc);
+        new RLESparseResourceAllocation(resCalc);
     int[] alloc = { 10, 10, 10, 10, 10, 10 };
     int start = 100;
     Set<Entry<ReservationInterval, Resource>> inputs =
@@ -75,12 +319,63 @@ public class TestRLESparseResourceAllocation {
   }
 
   @Test
+  public void testPartialRemoval() {
+    ResourceCalculator resCalc = new DefaultResourceCalculator();
+
+    RLESparseResourceAllocation rleSparseVector =
+        new RLESparseResourceAllocation(resCalc);
+
+    ReservationInterval riAdd = new ReservationInterval(10, 20);
+    Resource rr = Resource.newInstance(1024 * 100, 100);
+
+    ReservationInterval riAdd2 = new ReservationInterval(20, 30);
+
+    Resource rr2 = Resource.newInstance(1024 * 200, 200);
+
+    ReservationInterval riRemove = new ReservationInterval(12, 25);
+    // same if we use this
+    // ReservationRequest rrRemove =
+    // ReservationRequest.newInstance(Resource.newInstance(1024, 1), 100, 1,6);
+    LOG.info(rleSparseVector.toString());
+
+    rleSparseVector.addInterval(riAdd, rr);
+    rleSparseVector.addInterval(riAdd2, rr2);
+    LOG.info(rleSparseVector.toString());
+
+    rleSparseVector.removeInterval(riRemove, rr);
+    LOG.info(rleSparseVector.toString());
+
+    // Current bug prevents this to pass. The RLESparseResourceAllocation
+    // does not handle removal of "partial"
+    // allocations correctly.
+    Assert.assertEquals(102400, rleSparseVector.getCapacityAtTime(10)
+        .getMemory());
+    Assert.assertEquals(0, rleSparseVector.getCapacityAtTime(13).getMemory());
+    Assert.assertEquals(0, rleSparseVector.getCapacityAtTime(19).getMemory());
+    Assert.assertEquals(102400, rleSparseVector.getCapacityAtTime(21)
+        .getMemory());
+    Assert.assertEquals(2 * 102400, rleSparseVector.getCapacityAtTime(26)
+        .getMemory());
+
+    ReservationInterval riRemove2 = new ReservationInterval(9, 13);
+    rleSparseVector.removeInterval(riRemove2, rr);
+    LOG.info(rleSparseVector.toString());
+
+    Assert.assertEquals(0, rleSparseVector.getCapacityAtTime(11).getMemory());
+    Assert.assertEquals(-102400, rleSparseVector.getCapacityAtTime(9)
+        .getMemory());
+    Assert.assertEquals(0, rleSparseVector.getCapacityAtTime(13).getMemory());
+    Assert.assertEquals(102400, rleSparseVector.getCapacityAtTime(20)
+        .getMemory());
+
+  }
+
+  @Test
   public void testSteps() {
     ResourceCalculator resCalc = new DefaultResourceCalculator();
-    Resource minAlloc = Resource.newInstance(1, 1);
 
     RLESparseResourceAllocation rleSparseVector =
-        new RLESparseResourceAllocation(resCalc, minAlloc);
+        new RLESparseResourceAllocation(resCalc);
     int[] alloc = { 10, 10, 10, 10, 10, 10 };
     int start = 100;
     Set<Entry<ReservationInterval, Resource>> inputs =
@@ -102,7 +397,7 @@ public class TestRLESparseResourceAllocation {
     Assert.assertEquals(Resource.newInstance(0, 0),
         rleSparseVector.getCapacityAtTime(start + alloc.length + 2));
     for (Entry<ReservationInterval, Resource> ip : inputs) {
-      rleSparseVector.removeInterval(ip.getKey(),ip.getValue());
+      rleSparseVector.removeInterval(ip.getKey(), ip.getValue());
     }
     LOG.info(rleSparseVector.toString());
     for (int i = 0; i < alloc.length; i++) {
@@ -115,10 +410,9 @@ public class TestRLESparseResourceAllocation {
   @Test
   public void testSkyline() {
     ResourceCalculator resCalc = new DefaultResourceCalculator();
-    Resource minAlloc = Resource.newInstance(1, 1);
 
     RLESparseResourceAllocation rleSparseVector =
-        new RLESparseResourceAllocation(resCalc, minAlloc);
+        new RLESparseResourceAllocation(resCalc);
     int[] alloc = { 0, 5, 10, 10, 5, 0 };
     int start = 100;
     Set<Entry<ReservationInterval, Resource>> inputs =
@@ -151,11 +445,10 @@ public class TestRLESparseResourceAllocation {
   }
 
   @Test
-  public void testZeroAlloaction() {
+  public void testZeroAllocation() {
     ResourceCalculator resCalc = new DefaultResourceCalculator();
-    Resource minAlloc = Resource.newInstance(1, 1);
     RLESparseResourceAllocation rleSparseVector =
-        new RLESparseResourceAllocation(resCalc, minAlloc);
+        new RLESparseResourceAllocation(resCalc);
     rleSparseVector.addInterval(new ReservationInterval(0, Long.MAX_VALUE),
         Resource.newInstance(0, 0));
     LOG.info(rleSparseVector.toString());
@@ -167,9 +460,8 @@ public class TestRLESparseResourceAllocation {
   @Test
   public void testToIntervalMap() {
     ResourceCalculator resCalc = new DefaultResourceCalculator();
-    Resource minAlloc = Resource.newInstance(1, 1);
     RLESparseResourceAllocation rleSparseVector =
-        new RLESparseResourceAllocation(resCalc, minAlloc);
+        new RLESparseResourceAllocation(resCalc);
     Map<ReservationInterval, Resource> mapAllocations;
 
     // Check empty
@@ -186,8 +478,7 @@ public class TestRLESparseResourceAllocation {
     }
     mapAllocations = rleSparseVector.toIntervalMap();
     Assert.assertTrue(mapAllocations.size() == 5);
-    for (Entry<ReservationInterval, Resource> entry : mapAllocations
-        .entrySet()) {
+    for (Entry<ReservationInterval, Resource> entry : mapAllocations.entrySet()) {
       ReservationInterval interval = entry.getKey();
       Resource resource = entry.getValue();
       if (interval.getStartTime() == 101L) {
@@ -211,8 +502,38 @@ public class TestRLESparseResourceAllocation {
     }
   }
 
-  private Map<ReservationInterval, Resource> generateAllocation(
-      int startTime, int[] alloc, boolean isStep) {
+  private void setupArrays(TreeMap<Long, Resource> a, TreeMap<Long, Resource>
b) {
+    a.put(10L, Resource.newInstance(5, 5));
+    a.put(20L, Resource.newInstance(10, 10));
+    a.put(30L, Resource.newInstance(15, 15));
+    a.put(40L, Resource.newInstance(20, 20));
+    a.put(50L, Resource.newInstance(0, 0));
+
+    b.put(11L, Resource.newInstance(5, 5));
+    b.put(22L, Resource.newInstance(10, 10));
+    b.put(33L, Resource.newInstance(15, 15));
+    b.put(40L, Resource.newInstance(20, 20));
+    b.put(42L, Resource.newInstance(20, 20));
+    b.put(43L, Resource.newInstance(10, 10));
+  }
+
+  private void validate(RLESparseResourceAllocation out, long[] time,
+      int[] alloc) {
+    int i = 0;
+    for (Entry<Long, Resource> res : out.getCumulative().entrySet()) {
+      assertEquals(time[i], ((long) res.getKey()));
+      if (i > alloc.length - 1) {
+        assertNull(res.getValue());
+      } else {
+        assertEquals(alloc[i], res.getValue().getVirtualCores());
+      }
+      i++;
+    }
+    assertEquals(time.length, i);
+  }
+
+  private Map<ReservationInterval, Resource> generateAllocation(int startTime,
+      int[] alloc, boolean isStep) {
     Map<ReservationInterval, Resource> req =
         new HashMap<ReservationInterval, Resource>();
     int numContainers = 0;


Mime
View raw message