aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject aurora git commit: Generalizing AcceptedOffer resource management.
Date Mon, 16 May 2016 17:27:26 GMT
Repository: aurora
Updated Branches:
  refs/heads/master 485da81ce -> 5699c959f


Generalizing AcceptedOffer resource management.

Reviewed at https://reviews.apache.org/r/47261/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/5699c959
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/5699c959
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/5699c959

Branch: refs/heads/master
Commit: 5699c959f630e224155a489ca24bfcc25fd1f5f3
Parents: 485da81
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Mon May 16 10:27:13 2016 -0700
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Mon May 16 10:27:13 2016 -0700

----------------------------------------------------------------------
 .../apache/aurora/scheduler/base/Numbers.java   |  10 +
 .../scheduler/mesos/MesosTaskFactory.java       |  10 +-
 .../scheduler/resources/AcceptedOffer.java      | 220 ++++---------------
 .../resources/MesosResourceConverter.java       | 116 +++++++++-
 .../scheduler/resources/ResourceManager.java    |  21 +-
 .../scheduler/resources/ResourceMapper.java     |  19 +-
 .../scheduler/resources/ResourceSlot.java       |  13 +-
 .../scheduler/resources/ResourceType.java       |   6 +-
 .../mesos/MesosTaskFactoryImplTest.java         |   9 +-
 .../scheduler/resources/AcceptedOfferTest.java  | 190 +++++-----------
 .../resources/MesosResourceConverterTest.java   |  85 +++++++
 .../scheduler/resources/ResourceTestUtil.java   |   2 +-
 12 files changed, 366 insertions(+), 335 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/5699c959/src/main/java/org/apache/aurora/scheduler/base/Numbers.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/Numbers.java b/src/main/java/org/apache/aurora/scheduler/base/Numbers.java
index 703ca3b..e84070c 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/Numbers.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/Numbers.java
@@ -15,6 +15,7 @@ package org.apache.aurora.scheduler.base;
 
 import java.util.Set;
 
+import com.google.common.base.Function;
 import com.google.common.collect.DiscreteDomain;
 import com.google.common.collect.ImmutableRangeSet;
 import com.google.common.collect.ImmutableSet;
@@ -25,11 +26,20 @@ import com.google.common.collect.Sets;
 
 import org.apache.aurora.GuavaUtils;
 import org.apache.aurora.scheduler.storage.entities.IRange;
+import org.apache.mesos.Protos;
 
 /**
  * Utility class for working with numbers.
  */
 public final class Numbers {
+  /**
+   * Convert {@link com.google.common.collect.Range} to {@link org.apache.mesos.Protos.Value.Range}.
+   */
+  public static final Function<Range<Integer>, Protos.Value.Range> RANGE_TRANSFORM =
+      input -> Protos.Value.Range.newBuilder()
+          .setBegin(input.lowerEndpoint())
+          .setEnd(input.upperEndpoint())
+          .build();
 
   private Numbers() {
     // Utility class.

http://git-wip-us.apache.org/repos/asf/aurora/blob/5699c959/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
index ef1b5bc..cbe2721 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
@@ -13,7 +13,6 @@
  */
 package org.apache.aurora.scheduler.mesos;
 
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -148,14 +147,13 @@ public interface MesosTaskFactory {
       try {
         acceptedOffer = AcceptedOffer.create(
             offer,
-            ResourceSlot.from(config),
-            executorSettings.getExecutorOverhead().toSlot(),
-            ImmutableSet.copyOf(task.getAssignedPorts().values()),
+            task,
+            executorSettings.getExecutorOverhead(),
             tierManager.getTier(task.getTask()));
       } catch (Resources.InsufficientResourcesException e) {
         throw new SchedulerException(e);
       }
-      List<Resource> resources = acceptedOffer.getTaskResources();
+      Iterable<Resource> resources = acceptedOffer.getTaskResources();
 
       LOG.debug(
           "Setting task resources to {}",
@@ -266,7 +264,7 @@ public interface MesosTaskFactory {
       ExecutorInfo.Builder builder = executorSettings.getExecutorConfig().getExecutor().toBuilder()
           .setExecutorId(getExecutorId(task.getTaskId()))
           .setSource(getInstanceSourceName(task.getTask(), task.getInstanceId()));
-      List<Resource> executorResources = acceptedOffer.getExecutorResources();
+      Iterable<Resource> executorResources = acceptedOffer.getExecutorResources();
       LOG.debug(
           "Setting executor resources to {}",
           Iterables.transform(executorResources, Protobufs::toString));

http://git-wip-us.apache.org/repos/asf/aurora/blob/5699c959/src/main/java/org/apache/aurora/scheduler/resources/AcceptedOffer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/AcceptedOffer.java b/src/main/java/org/apache/aurora/scheduler/resources/AcceptedOffer.java
index fce6621..b28acd5 100644
--- a/src/main/java/org/apache/aurora/scheduler/resources/AcceptedOffer.java
+++ b/src/main/java/org/apache/aurora/scheduler/resources/AcceptedOffer.java
@@ -14,54 +14,37 @@
 package org.apache.aurora.scheduler.resources;
 
 import java.util.List;
-import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.StreamSupport;
 
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.collect.FluentIterable;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
 
-import org.apache.aurora.common.quantity.Data;
 import org.apache.aurora.scheduler.TierInfo;
-import org.apache.aurora.scheduler.base.Numbers;
-import org.apache.mesos.Protos;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.Resource;
 
 import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+
+import static org.apache.aurora.scheduler.resources.ResourceManager.getOfferResources;
 
 /**
  * Allocate resources from an accepted Mesos Offer to TaskInfo and ExecutorInfo.
  */
 public final class AcceptedOffer {
-
-  public static final String DEFAULT_ROLE_NAME = "*";
-
   /**
    * Reserved resource filter.
    */
-  public static final Predicate<Resource> RESERVED =
-      e -> e.hasRole() && !e.getRole().equals(DEFAULT_ROLE_NAME);
-
-  /**
-   * Non reserved resource filter.
-   */
-  public static final Predicate<Resource> NOT_RESERVED = Predicates.not(RESERVED);
-
-  /**
-   * Helper function to check a resource value is small enough to be considered zero.
-   */
-  public static boolean nearZero(double value) {
-    return Math.abs(value) < EPSILON;
-  }
+  @VisibleForTesting
+  static final Predicate<Resource> RESERVED = e -> e.hasRole() && !e.getRole().equals("*");
 
   /**
    * Get proper value for {@link org.apache.mesos.Protos.TaskInfo}'s resources.
    * @return A list of Resource used for TaskInfo.
    */
-  public List<Resource> getTaskResources() {
+  public Iterable<Resource> getTaskResources() {
     return taskResources;
   }
 
@@ -69,56 +52,51 @@ public final class AcceptedOffer {
    * Get proper value for {@link org.apache.mesos.Protos.ExecutorInfo}'s resources.
    * @return A list of Resource used for ExecutorInfo.
    */
-  public List<Resource> getExecutorResources() {
+  public Iterable<Resource> getExecutorResources() {
     return executorResources;
   }
 
-  /**
-   * Use this epsilon value to avoid comparison with zero.
-   */
-  private static final double EPSILON = 1e-6;
-
-  private final List<Resource> taskResources;
-  private final List<Resource> executorResources;
+  private final Iterable<Resource> taskResources;
+  private final Iterable<Resource> executorResources;
 
   public static AcceptedOffer create(
       Offer offer,
-      ResourceSlot taskSlot,
-      ResourceSlot executorSlot,
-      Set<Integer> selectedPorts,
+      IAssignedTask task,
+      ResourceBag executorOverhead,
       TierInfo tierInfo) throws Resources.InsufficientResourcesException {
 
-    List<Resource> reservedFirst = ImmutableList.<Resource>builder()
-        .addAll(Iterables.filter(offer.getResourcesList(), RESERVED))
-        .addAll(Iterables.filter(offer.getResourcesList(), NOT_RESERVED))
-        .build();
-
-    boolean revocable = tierInfo.isRevocable();
-    List<Resource.Builder> cpuResources = filterToBuilders(
-        reservedFirst,
-        ResourceType.CPUS.getMesosName(),
-        revocable ? ResourceManager.REVOCABLE : ResourceManager.NON_REVOCABLE);
-    List<Resource.Builder> memResources = filterToBuilderNonRevocable(
-        reservedFirst, ResourceType.RAM_MB.getMesosName());
-    List<Resource.Builder> diskResources = filterToBuilderNonRevocable(
-        reservedFirst, ResourceType.DISK_MB.getMesosName());
-    List<Resource.Builder> portsResources = filterToBuilderNonRevocable(
-        reservedFirst, ResourceType.PORTS.getMesosName());
-
-    List<Resource> taskResources = ImmutableList.<Resource>builder()
-        .addAll(allocateScalarType(cpuResources, taskSlot.getNumCpus(), revocable))
-        .addAll(allocateScalarType(memResources, taskSlot.getRam().as(Data.MB), false))
-        .addAll(allocateScalarType(diskResources, taskSlot.getDisk().as(Data.MB), false))
-        .addAll(allocateRangeType(portsResources, selectedPorts))
-        .build();
-
-    List<Resource> executorResources = ImmutableList.<Resource>builder()
-        .addAll(allocateScalarType(cpuResources, executorSlot.getNumCpus(), revocable))
-        .addAll(allocateScalarType(memResources, executorSlot.getRam().as(Data.MB), false))
-        .addAll(allocateScalarType(diskResources, executorSlot.getDisk().as(Data.MB), false))
-        .build();
-
-    return new AcceptedOffer(taskResources, executorResources);
+    ImmutableList.Builder<Resource> taskResources = ImmutableList.builder();
+    ImmutableList.Builder<Resource> executorResources = ImmutableList.builder();
+
+    ResourceManager.bagFromResources(task.getTask().getResources())
+        .streamResourceVectors()
+        .forEach(entry -> {
+          ResourceType type = entry.getKey();
+          Iterable<Resource.Builder> offerResources = StreamSupport
+              .stream(getOfferResources(offer, tierInfo, entry.getKey()).spliterator(), false)
+              // Note the reverse order of args in .compare(): we want RESERVED resources first.
+              .sorted((l, r) -> Boolean.compare(RESERVED.test(r), RESERVED.test(l)))
+              .map(Resource::toBuilder)
+              .collect(toList());
+
+          boolean isRevocable = type.isMesosRevocable() && tierInfo.isRevocable();
+
+          taskResources.addAll(type.getMesosResourceConverter().toMesosResource(
+              offerResources,
+              type.getMapper().isPresent()
+                  ? () -> type.getMapper().get().getAssigned(task)
+                  : () -> entry.getValue(),
+              isRevocable));
+
+          if (executorOverhead.getResourceVectors().containsKey(type)) {
+            executorResources.addAll(type.getMesosResourceConverter().toMesosResource(
+                offerResources,
+                () -> executorOverhead.getResourceVectors().get(type),
+                isRevocable));
+          }
+        });
+
+    return new AcceptedOffer(taskResources.build(), executorResources.build());
   }
 
   private AcceptedOffer(
@@ -128,108 +106,4 @@ public final class AcceptedOffer {
     this.taskResources = requireNonNull(taskResources);
     this.executorResources = requireNonNull(executorResources);
   }
-
-  private static List<Resource> allocateRangeType(
-      List<Resource.Builder> from,
-      Set<Integer> valueSet) throws Resources.InsufficientResourcesException {
-
-    Set<Integer> leftOver = Sets.newHashSet(valueSet);
-    ImmutableList.Builder<Resource> result = ImmutableList.<Resource>builder();
-    for (Resource.Builder r : from) {
-      Set<Integer> fromResource = Sets.newHashSet(Iterables.concat(
-          Iterables.transform(r.getRanges().getRangeList(), Resources.RANGE_TO_MEMBERS)));
-      Set<Integer> available = Sets.newHashSet(Sets.intersection(leftOver, fromResource));
-      if (available.isEmpty()) {
-        continue;
-      }
-      Resource newResource = makeMesosRangeResource(r.build(), available);
-      result.add(newResource);
-      leftOver.removeAll(available);
-      if (leftOver.isEmpty()) {
-        break;
-      }
-    }
-    if (!leftOver.isEmpty()) {
-      // NOTE: this will not happen as long as Veto logic from TaskAssigner.maybeAssign is
-      // consistent.
-      // Maybe we should consider implementing resource veto with this class to ensure that.
-      throw new Resources.InsufficientResourcesException(
-          "Insufficient resource for range type when allocating from offer");
-    }
-    return result.build();
-  }
-
-  /**
-   * Creates a mesos resource of integer ranges from given prototype.
-   *
-   * @param prototype Resource prototype.
-   * @param values    Values to translate into ranges.
-   * @return A new mesos ranges resource.
-   */
-  static Resource makeMesosRangeResource(
-      Resource prototype,
-      Set<Integer> values) {
-
-    return Protos.Resource.newBuilder(prototype)
-        .setRanges(Protos.Value.Ranges.newBuilder()
-            .addAllRange(
-                Iterables.transform(Numbers.toRanges(values), ResourceSlot.RANGE_TRANSFORM)))
-        .build();
-  }
-
-  private static List<Resource> allocateScalarType(
-      List<Resource.Builder> from,
-      double amount,
-      boolean revocable) throws Resources.InsufficientResourcesException {
-
-    double remaining = amount;
-    ImmutableList.Builder<Resource> result = ImmutableList.builder();
-    for (Resource.Builder r : from) {
-      if (nearZero(remaining)) {
-        break;
-      }
-      final double available = r.getScalar().getValue();
-      if (nearZero(available)) {
-        // Skip resource slot that is already used up.
-        continue;
-      }
-      final double used = Math.min(remaining, available);
-      remaining -= used;
-      Resource.Builder newResource =
-          Resource.newBuilder(r.build())
-              .setScalar(Protos.Value.Scalar.newBuilder().setValue(used).build());
-      if (revocable) {
-        newResource.setRevocable(Resource.RevocableInfo.newBuilder());
-      }
-      result.add(newResource.build());
-      r.getScalarBuilder().setValue(available - used);
-    }
-    if (!nearZero(remaining)) {
-      // NOTE: this will not happen as long as Veto logic from TaskAssigner.maybeAssign is
-      // consistent.
-      // Maybe we should consider implementing resource veto with this class to ensure that.
-      throw new Resources.InsufficientResourcesException(
-          "Insufficient resource when allocating from offer");
-    }
-    return result.build();
-  }
-
-  private static List<Resource.Builder> filterToBuilders(
-      List<Resource> resources,
-      String name,
-      Predicate<Resource> additionalFilter) {
-
-    return FluentIterable.from(resources)
-        .filter(e -> e.getName().equals(name))
-        .filter(additionalFilter)
-        .transform(Resource::toBuilder)
-        .toList();
-  }
-
-  private static List<Resource.Builder> filterToBuilderNonRevocable(
-      List<Resource> resources,
-      String name) {
-
-    return filterToBuilders(resources, name, ResourceManager.NON_REVOCABLE);
-  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/5699c959/src/main/java/org/apache/aurora/scheduler/resources/MesosResourceConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/MesosResourceConverter.java b/src/main/java/org/apache/aurora/scheduler/resources/MesosResourceConverter.java
index f3fe05c..25d56b4 100644
--- a/src/main/java/org/apache/aurora/scheduler/resources/MesosResourceConverter.java
+++ b/src/main/java/org/apache/aurora/scheduler/resources/MesosResourceConverter.java
@@ -13,8 +13,19 @@
  */
 package org.apache.aurora.scheduler.resources;
 
+import java.util.Set;
+import java.util.function.Supplier;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+import org.apache.aurora.scheduler.base.Numbers;
+import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.Resource;
 
+import static org.apache.aurora.scheduler.base.Numbers.RANGE_TRANSFORM;
+
 /**
  * Converts Mesos resource values to be consumed in Aurora.
  */
@@ -28,14 +39,76 @@ public interface MesosResourceConverter {
    */
   Double quantify(Resource resource);
 
+  /**
+   * Allocates offer resources to resource request from {@code resourceRequest}.
+   *
+   * @param offerResources Offer resources to allocate.
+   * @param resourceRequest Resource request.
+   * @param isRevocable Flag indicating if allocated resources must be marked as Mesos-revocable.
+   * @return Allocated Mesos resources.
+   */
+  Iterable<Resource> toMesosResource(
+      Iterable<Resource.Builder> offerResources,
+      Supplier<?> resourceRequest,
+      boolean isRevocable);
+
   ScalarConverter SCALAR = new ScalarConverter();
   RangeConverter RANGES = new RangeConverter();
 
   class ScalarConverter implements MesosResourceConverter {
+    /**
+     * Helper function to check if a resource value is small enough to be considered zero.
+     */
+    private static boolean nearZero(double value) {
+      return Math.abs(value) < 1e-6;
+    }
+
     @Override
     public Double quantify(Resource resource) {
       return resource.getScalar().getValue();
     }
+
+    @Override
+    public Iterable<Resource> toMesosResource(
+        Iterable<Resource.Builder> offerResources,
+        Supplier<?> resourceRequest,
+        boolean isRevocable) {
+
+      double remaining = (Double) resourceRequest.get();
+      ImmutableList.Builder<Resource> result = ImmutableList.builder();
+      for (Resource.Builder offerResource : offerResources) {
+        if (nearZero(remaining)) {
+          break;
+        }
+
+        final double available = offerResource.getScalar().getValue();
+        if (nearZero(available)) {
+          // Skip resource slot that is already used up.
+          continue;
+        }
+
+        final double used = Math.min(remaining, available);
+        remaining -= used;
+        Resource.Builder newResource =
+            Resource.newBuilder(offerResource.build())
+                .setScalar(Protos.Value.Scalar.newBuilder().setValue(used).build());
+
+        if (isRevocable) {
+          newResource.setRevocable(Resource.RevocableInfo.newBuilder());
+        }
+
+        result.add(newResource.build());
+        offerResource.getScalarBuilder().setValue(available - used);
+      }
+      if (!nearZero(remaining)) {
+        // NOTE: this will not happen as long as Veto logic from TaskAssigner.maybeAssign is
+        // consistent.
+        // Maybe we should consider implementing resource veto with this class to ensure that.
+        throw new Resources.InsufficientResourcesException(
+            "Insufficient resource when allocating from offer");
+      }
+      return result.build();
+    }
   }
 
   class RangeConverter implements MesosResourceConverter {
@@ -44,8 +117,49 @@ public interface MesosResourceConverter {
       return resource.getRanges().getRangeList().stream()
           .map(range -> 1 + range.getEnd() - range.getBegin())
           .reduce((l, r) -> l + r)
-          .map(v -> v.doubleValue())
+          .map(Long::doubleValue)
           .orElse(0.0);
     }
+
+    @Override
+    public Iterable<Resource> toMesosResource(
+        Iterable<Resource.Builder> offerResources,
+        Supplier<?> resourceRequest,
+        boolean isRevocable) {
+
+      @SuppressWarnings("unchecked")
+      Set<Integer> leftOver = Sets.newHashSet((Set<Integer>) resourceRequest.get());
+      ImmutableList.Builder<Resource> result = ImmutableList.builder();
+      for (Resource.Builder r : offerResources) {
+        Set<Integer> fromResource = Sets.newHashSet(Iterables.concat(
+            Iterables.transform(r.getRanges().getRangeList(), Resources.RANGE_TO_MEMBERS)));
+        Set<Integer> available = Sets.newHashSet(Sets.intersection(leftOver, fromResource));
+        if (available.isEmpty()) {
+          continue;
+        }
+
+        Resource.Builder newResource = Protos.Resource.newBuilder(r.build())
+            .setRanges(Protos.Value.Ranges.newBuilder()
+                .addAllRange(Iterables.transform(Numbers.toRanges(available), RANGE_TRANSFORM)));
+
+        if (isRevocable) {
+          newResource.setRevocable(Resource.RevocableInfo.newBuilder());
+        }
+
+        result.add(newResource.build());
+        leftOver.removeAll(available);
+        if (leftOver.isEmpty()) {
+          break;
+        }
+      }
+      if (!leftOver.isEmpty()) {
+        // NOTE: this will not happen as long as Veto logic from TaskAssigner.maybeAssign is
+        // consistent.
+        // Maybe we should consider implementing resource veto with this class to ensure that.
+        throw new Resources.InsufficientResourcesException(
+            "Insufficient resource for range type when allocating from offer");
+      }
+      return result.build();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/5699c959/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
index 3b38469..cc1d758 100644
--- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
@@ -114,6 +114,22 @@ public final class ResourceManager {
   }
 
   /**
+   * Gets offer resoruces filtered by the {@code tierInfo} and {@code type}.
+   *
+   * @param offer Offer to get resources from.
+   * @param tierInfo Tier info.
+   * @param type Resource type.
+   * @return Offer resources filtered by {@code tierInfo} and {@code type}.
+   */
+  public static Iterable<Resource> getOfferResources(
+      Offer offer,
+      TierInfo tierInfo,
+      ResourceType type) {
+
+    return Iterables.filter(getOfferResources(offer, tierInfo), r -> fromResource(r).equals(type));
+  }
+
+  /**
    * Same as {@link #getTaskResources(ITaskConfig, ResourceType)}.
    *
    * @param task Scheduled task to get resources from.
@@ -156,9 +172,10 @@ public final class ResourceManager {
    * @return Set of {@link ResourceType} instances representing task resources.
    */
   public static Set<ResourceType> getTaskResourceTypes(IAssignedTask task) {
-    return EnumSet.copyOf(task.getTask().getResources().stream()
+    Set<ResourceType> types = task.getTask().getResources().stream()
         .map(RESOURCE_TO_TYPE)
-        .collect(Collectors.toSet()));
+        .collect(Collectors.toSet());
+    return types.isEmpty() ? types : EnumSet.copyOf(types);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/aurora/blob/5699c959/src/main/java/org/apache/aurora/scheduler/resources/ResourceMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceMapper.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceMapper.java
index c8e11a4..ccfd997 100644
--- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceMapper.java
+++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceMapper.java
@@ -17,10 +17,12 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.ContiguousSet;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Range;
 
 import org.apache.aurora.gen.AssignedTask;
@@ -37,7 +39,7 @@ import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
 /**
  * Maps requested (task) resources to available (offer) resources.
  */
-public interface ResourceMapper {
+public interface ResourceMapper<T> {
 
   /**
    * Maps task resources to offer resources and returns a new task with updated mapping.
@@ -48,9 +50,17 @@ public interface ResourceMapper {
    */
   IAssignedTask mapAndAssign(Offer offer, IAssignedTask task);
 
+  /**
+   * Gets assigned resource values stored in {@code task}.
+   *
+   * @param task Task to get assigned resources from.
+   * @return Assigned resource values.
+   */
+  T getAssigned(IAssignedTask task);
+
   PortMapper PORT_MAPPER = new PortMapper();
 
-  class PortMapper implements ResourceMapper {
+  class PortMapper implements ResourceMapper<Set<Integer>> {
     @Override
     public IAssignedTask mapAndAssign(Offer offer, IAssignedTask task) {
       List<Integer> availablePorts =
@@ -80,5 +90,10 @@ public interface ResourceMapper {
       builder.setAssignedPorts(ImmutableMap.copyOf(portMap));
       return IAssignedTask.build(builder);
     }
+
+    @Override
+    public Set<Integer> getAssigned(IAssignedTask task) {
+      return ImmutableSet.copyOf(task.getAssignedPorts().values());
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/5699c959/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java
index dea7943..81a3bf8 100644
--- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java
+++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java
@@ -20,10 +20,8 @@ import java.util.Set;
 import java.util.function.Consumer;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Range;
 
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Data;
@@ -60,15 +58,6 @@ public final class ResourceSlot {
   public static final ResourceSlot NONE =
       new ResourceSlot(0, Amount.of(0L, Data.BITS), Amount.of(0L, Data.BITS), 0);
 
-  /**
-   * Convert {@link com.google.common.collect.Range} to {@link org.apache.mesos.Protos.Value.Range}.
-   */
-  public static final Function<Range<Integer>, Protos.Value.Range> RANGE_TRANSFORM =
-      input -> Protos.Value.Range.newBuilder()
-          .setBegin(input.lowerEndpoint())
-          .setEnd(input.upperEndpoint())
-          .build();
-
   public ResourceSlot(
       double numCpus,
       Amount<Long, Data> ram,
@@ -156,7 +145,7 @@ public final class ResourceSlot {
         .setName(resourceType.getMesosName())
         .setType(Protos.Value.Type.RANGES)
         .setRanges(Protos.Value.Ranges.newBuilder()
-            .addAllRange(Iterables.transform(Numbers.toRanges(values), RANGE_TRANSFORM)))
+            .addAllRange(Iterables.transform(Numbers.toRanges(values), Numbers.RANGE_TRANSFORM)))
         .build();
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/5699c959/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java
index 276320a..6a4f110 100644
--- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java
+++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java
@@ -125,7 +125,7 @@ public enum ResourceType implements TEnum {
   /**
    * Optional resource mapper to use.
    */
-  private final Optional<ResourceMapper> mapper;
+  private final Optional<ResourceMapper<?>> mapper;
 
   /**
    * Aurora resource name.
@@ -177,7 +177,7 @@ public enum ResourceType implements TEnum {
       MesosResourceConverter mesosResourceConverter,
       String mesosName,
       AuroraResourceConverter<?> auroraResourceConverter,
-      Optional<ResourceMapper> mapper,
+      Optional<ResourceMapper<?>> mapper,
       String auroraName,
       String auroraUnit,
       int scalingRange,
@@ -241,7 +241,7 @@ public enum ResourceType implements TEnum {
    *
    * @return Optional ResourceMapper.
    */
-  public Optional<ResourceMapper> getMapper() {
+  public Optional<ResourceMapper<?>> getMapper() {
     return mapper;
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/5699c959/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
index cf4d350..b2dd7ad 100644
--- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
@@ -149,12 +149,17 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
 
   private static ExecutorInfo populateDynamicFields(ExecutorInfo executor, IAssignedTask task) {
     return executor.toBuilder()
+        .clearResources()
         .setExecutorId(MesosTaskFactoryImpl.getExecutorId(task.getTaskId()))
         .setSource(
             MesosTaskFactoryImpl.getInstanceSourceName(task.getTask(), task.getInstanceId()))
         .build();
   }
 
+  private static ExecutorInfo makeComparable(ExecutorInfo executorInfo) {
+    return executorInfo.toBuilder().clearResources().build();
+  }
+
   private static ExecutorInfo purgeZeroResources(ExecutorInfo executor) {
     return executor.toBuilder()
         .clearResources()
@@ -176,7 +181,7 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
 
     TaskInfo task = taskFactory.createFrom(TASK, OFFER_THERMOS_EXECUTOR);
 
-    assertEquals(populateDynamicFields(DEFAULT_EXECUTOR, TASK), task.getExecutor());
+    assertEquals(populateDynamicFields(DEFAULT_EXECUTOR, TASK), makeComparable(task.getExecutor()));
     checkTaskResources(TASK.getTask(), task);
     checkDiscoveryInfoUnset(task);
   }
@@ -233,7 +238,7 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
     assertEquals(
         purgeZeroResources(populateDynamicFields(
             NO_OVERHEAD_EXECUTOR.getExecutorConfig().getExecutor(), TASK)),
-        task.getExecutor());
+        makeComparable(task.getExecutor()));
 
     // Simulate the upsizing needed for the task to meet the minimum thermos requirements.
     TaskConfig dummyTask = TASK.getTask().newBuilder();

http://git-wip-us.apache.org/repos/asf/aurora/blob/5699c959/src/test/java/org/apache/aurora/scheduler/resources/AcceptedOfferTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/resources/AcceptedOfferTest.java b/src/test/java/org/apache/aurora/scheduler/resources/AcceptedOfferTest.java
index 36c5c11..2777d72 100644
--- a/src/test/java/org/apache/aurora/scheduler/resources/AcceptedOfferTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/resources/AcceptedOfferTest.java
@@ -14,21 +14,24 @@
 package org.apache.aurora.scheduler.resources;
 
 import java.util.Collections;
-import java.util.List;
 import java.util.Set;
 
 import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Data;
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.TierInfo;
-import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.Resource;
 import org.junit.Test;
 
+import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
+import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromResources;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.bag;
 import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
 import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar;
 import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer;
@@ -41,73 +44,36 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class AcceptedOfferTest {
-
   private static final Optional<String> TEST_ROLE = Optional.of("test-role");
   private static final Optional<String> ABSENT_ROLE = Optional.absent();
-  private static final ResourceSlot TASK_SLOT = new ResourceSlot(
-      4, Amount.of(100L, Data.MB), Amount.of(200L, Data.MB), 0);
-  private static final ResourceSlot EXECUTOR_SLOT = new ResourceSlot(
-      0.25, Amount.of(25L, Data.MB), Amount.of(75L, Data.MB), 0);
-  private static final ResourceSlot TOTAL_SLOT = EXECUTOR_SLOT.add(TASK_SLOT);
-  private static final Integer[] TASK_PORTS = {80, 90};
-  private static final Set<Integer> TASK_PORTS_SET = ImmutableSet.copyOf(TASK_PORTS);
+  private static final IAssignedTask TASK = makeTask("id", JOB).getAssignedTask();
+  private static final ResourceBag EXECUTOR_BAG = bag(0.25, 25, 75);
+  private static final ResourceBag TOTAL_BAG =
+      EXECUTOR_BAG.add(bagFromResources(TASK.getTask().getResources()));
+  private static final Integer[] TASK_PORTS = {TASK.getAssignedPorts().get("http")};
 
   @Test
   public void testReservedPredicates() {
     Protos.Resource withRole = mesosScalar(CPUS, TEST_ROLE, false, 1.0);
-    assertTrue(AcceptedOffer.RESERVED.apply(withRole));
-    assertFalse(AcceptedOffer.NOT_RESERVED.apply(withRole));
+    assertTrue(AcceptedOffer.RESERVED.test(withRole));
+    assertFalse(AcceptedOffer.RESERVED.negate().test(withRole));
     Protos.Resource absentRole = mesosScalar(CPUS, ABSENT_ROLE, false, 1.0);
-    assertFalse(AcceptedOffer.RESERVED.apply(absentRole));
-    assertTrue(AcceptedOffer.NOT_RESERVED.apply(absentRole));
+    assertFalse(AcceptedOffer.RESERVED.test(absentRole));
+    assertTrue(AcceptedOffer.RESERVED.negate().test(absentRole));
   }
 
   @Test
   public void testAllocateEmpty() {
     AcceptedOffer acceptedOffer = AcceptedOffer.create(
         offer(),
-        ResourceSlot.NONE,
-        ResourceSlot.NONE,
-        ImmutableSet.of(),
-        TaskTestUtil.DEV_TIER);
+        IAssignedTask.build(new AssignedTask().setTask(new TaskConfig())),
+        ResourceBag.EMPTY,
+        DEV_TIER);
     assertEquals(Collections.emptyList(), acceptedOffer.getTaskResources());
     assertEquals(Collections.emptyList(), acceptedOffer.getExecutorResources());
   }
 
   @Test
-  public void testAllocateRange() {
-    AcceptedOffer acceptedOffer = AcceptedOffer.create(
-        offer(
-            mesosRange(PORTS, Optional.absent(), 80, 81, 90, 91, 92, 93),
-            mesosRange(PORTS, TEST_ROLE, 100, 101)),
-        ResourceSlot.NONE,
-        ResourceSlot.NONE,
-        ImmutableSet.of(80, 90, 100),
-        TaskTestUtil.DEV_TIER);
-
-    List<Resource> expected = ImmutableList.<Resource>builder()
-        // Because we prefer reserved resources and handle them before non-reserved resources,
-        // result should have ports for the reserved resources first.
-        .add(mesosRange(PORTS, TEST_ROLE, 100))
-        .add(mesosRange(PORTS, Optional.absent(), 80, 90))
-        .build();
-    assertEquals(expected, acceptedOffer.getTaskResources());
-    assertEquals(Collections.emptyList(), acceptedOffer.getExecutorResources());
-  }
-
-  @Test(expected = Resources.InsufficientResourcesException.class)
-  public void testAllocateRangeInsufficent() {
-    AcceptedOffer.create(
-        offer(
-            mesosRange(PORTS, ABSENT_ROLE, 80),
-            mesosRange(PORTS, ABSENT_ROLE, 100, 101)),
-        ResourceSlot.NONE,
-        ResourceSlot.NONE,
-        ImmutableSet.of(80, 90, 100),
-        TaskTestUtil.DEV_TIER);
-  }
-
-  @Test
   public void testAllocateSingleRole() {
     runAllocateSingleRole(ABSENT_ROLE, false);
     runAllocateSingleRole(ABSENT_ROLE, true);
@@ -115,47 +81,31 @@ public class AcceptedOfferTest {
     runAllocateSingleRole(TEST_ROLE, true);
   }
 
-  private void runAllocateSingleRole(Optional<String> role, boolean cpuRevocable) {
+  private void runAllocateSingleRole(Optional<String> role, boolean revocable) {
     Protos.Offer offer = offer(
-        mesosScalar(CPUS, role, cpuRevocable, TOTAL_SLOT.getNumCpus()),
-        mesosScalar(RAM_MB, role, false, TOTAL_SLOT.getRam().as(Data.MB)),
-        mesosScalar(DISK_MB, role, false, TOTAL_SLOT.getDisk().as(Data.MB)),
+        mesosScalar(CPUS, TOTAL_BAG.valueOf(CPUS), revocable),
+        mesosScalar(RAM_MB, TOTAL_BAG.valueOf(RAM_MB), false),
+        mesosScalar(DISK_MB, TOTAL_BAG.valueOf(DISK_MB), false),
         mesosRange(PORTS, role, TASK_PORTS));
 
     AcceptedOffer offerAllocation = AcceptedOffer.create(
-        offer, TASK_SLOT, EXECUTOR_SLOT, TASK_PORTS_SET, new TierInfo(false, cpuRevocable));
+        offer, TASK, EXECUTOR_BAG, new TierInfo(false, revocable));
 
-    List<Resource> taskList = ImmutableList.<Resource>builder()
-        .add(mesosScalar(CPUS, role, cpuRevocable, TASK_SLOT.getNumCpus()))
-        .add(mesosScalar(RAM_MB, role, false, TASK_SLOT.getRam().as(Data.MB)))
-        .add(mesosScalar(
-            DISK_MB, role, false, TASK_SLOT.getDisk().as(Data.MB)))
+    ResourceBag bag = bagFromResources(TASK.getTask().getResources());
+    Set<Resource> taskResources = ImmutableSet.<Resource>builder()
+        .add(mesosScalar(CPUS, bag.valueOf(CPUS), revocable))
+        .add(mesosScalar(RAM_MB, bag.valueOf(RAM_MB), false))
+        .add(mesosScalar(DISK_MB, bag.valueOf(DISK_MB), false))
         .add(mesosRange(PORTS, role, TASK_PORTS))
         .build();
-    assertEquals(taskList, offerAllocation.getTaskResources());
+    assertEquals(taskResources, ImmutableSet.copyOf(offerAllocation.getTaskResources()));
 
-    List<Resource> executorList = ImmutableList.<Resource>builder()
-        .add(mesosScalar(
-            CPUS, role, cpuRevocable, EXECUTOR_SLOT.getNumCpus()))
-        .add(mesosScalar(
-            RAM_MB, role, false, EXECUTOR_SLOT.getRam().as(Data.MB)))
-        .add(mesosScalar(
-            DISK_MB, role, false, EXECUTOR_SLOT.getDisk().as(Data.MB)))
+    Set<Resource> executorResources = ImmutableSet.<Resource>builder()
+        .add(mesosScalar(CPUS, EXECUTOR_BAG.valueOf(CPUS), revocable))
+        .add(mesosScalar(RAM_MB, EXECUTOR_BAG.valueOf(RAM_MB), false))
+        .add(mesosScalar(DISK_MB, EXECUTOR_BAG.valueOf(DISK_MB), false))
         .build();
-    assertEquals(executorList, offerAllocation.getExecutorResources());
-  }
-
-  @Test(expected = Resources.InsufficientResourcesException.class)
-  public void testAllocateSingleRoleInsufficient() {
-    Protos.Offer offer = offer(
-        // EXECUTOR_SLOT's CPU is not included here.
-        mesosScalar(CPUS, TEST_ROLE, false, TASK_SLOT.getNumCpus()),
-        mesosScalar(RAM_MB, TEST_ROLE, false, TOTAL_SLOT.getRam().as(Data.MB)),
-        mesosScalar(DISK_MB, TEST_ROLE, false, TOTAL_SLOT.getDisk().as(Data.MB)),
-        mesosRange(PORTS, TEST_ROLE, TASK_PORTS));
-
-    AcceptedOffer.create(
-        offer, TASK_SLOT, EXECUTOR_SLOT, TASK_PORTS_SET, new TierInfo(false, false));
+    assertEquals(executorResources, ImmutableSet.copyOf(offerAllocation.getExecutorResources()));
   }
 
   @Test
@@ -164,65 +114,39 @@ public class AcceptedOfferTest {
     runMultipleRoles(true);
   }
 
-  private void runMultipleRoles(boolean cpuRevocable) {
+  private void runMultipleRoles(boolean revocable) {
+    ResourceBag bag = bagFromResources(TASK.getTask().getResources());
     Protos.Offer offer = offer(
         // Make cpus come from two roles.
-        mesosScalar(CPUS, TEST_ROLE, cpuRevocable, EXECUTOR_SLOT.getNumCpus()),
-        mesosScalar(CPUS, ABSENT_ROLE, cpuRevocable, TASK_SLOT.getNumCpus()),
+        mesosScalar(CPUS, TEST_ROLE, revocable, EXECUTOR_BAG.valueOf(CPUS)),
+        mesosScalar(CPUS, ABSENT_ROLE, revocable, bag.valueOf(CPUS)),
         // Make ram come from default role
-        mesosScalar(RAM_MB, ABSENT_ROLE, false, TOTAL_SLOT.getRam().as(Data.MB)),
+        mesosScalar(RAM_MB, ABSENT_ROLE, false, TOTAL_BAG.valueOf(RAM_MB)),
         // Make disk come from non-default role.
-        mesosScalar(DISK_MB, TEST_ROLE, false, TOTAL_SLOT.getDisk().as(Data.MB)),
-        mesosRange(PORTS, TEST_ROLE, 80),
-        mesosRange(PORTS, ABSENT_ROLE, 90));
+        mesosScalar(DISK_MB, TEST_ROLE, false, TOTAL_BAG.valueOf(DISK_MB)),
+        mesosRange(PORTS, TEST_ROLE, TASK_PORTS));
 
     AcceptedOffer offerAllocation = AcceptedOffer.create(
-        offer, TASK_SLOT, EXECUTOR_SLOT, TASK_PORTS_SET, new TierInfo(false, cpuRevocable));
+        offer, TASK, EXECUTOR_BAG, new TierInfo(false, revocable));
 
-    List<Resource> taskList = ImmutableList.<Resource>builder()
-        // We intentionally sliced the offer resource to not align with TASK_SLOT's num cpus.
-        .add(mesosScalar(
-            CPUS, TEST_ROLE, cpuRevocable, EXECUTOR_SLOT.getNumCpus()))
+    Set<Resource> taskSet = ImmutableSet.<Resource>builder()
+        .add(mesosScalar(CPUS, TEST_ROLE, revocable, EXECUTOR_BAG.valueOf(CPUS)))
         .add(mesosScalar(
             CPUS,
             ABSENT_ROLE,
-            cpuRevocable,
-            TASK_SLOT.subtract(EXECUTOR_SLOT).getNumCpus()))
-        .add(mesosScalar(
-            RAM_MB, ABSENT_ROLE, false, TASK_SLOT.getRam().as(Data.MB)))
-        .add(mesosScalar(
-            DISK_MB, TEST_ROLE, false, TASK_SLOT.getDisk().as(Data.MB)))
-        .add(mesosRange(PORTS, TEST_ROLE, 80))
-        .add(mesosRange(PORTS, ABSENT_ROLE, 90))
+            revocable,
+            bag.subtract(EXECUTOR_BAG).valueOf(CPUS)))
+        .add(mesosScalar(RAM_MB, ABSENT_ROLE, false, bag.valueOf(RAM_MB)))
+        .add(mesosScalar(DISK_MB, TEST_ROLE, false, bag.valueOf(DISK_MB)))
+        .add(mesosRange(PORTS, TEST_ROLE, TASK_PORTS))
         .build();
-    assertEquals(taskList, offerAllocation.getTaskResources());
+    assertEquals(taskSet, ImmutableSet.copyOf(offerAllocation.getTaskResources()));
 
-    List<Resource> executorList = ImmutableList.<Resource>builder()
-        .add(mesosScalar(
-            CPUS, ABSENT_ROLE, cpuRevocable, EXECUTOR_SLOT.getNumCpus()))
-        .add(mesosScalar(
-            RAM_MB, ABSENT_ROLE, false, EXECUTOR_SLOT.getRam().as(Data.MB)))
-        .add(mesosScalar(
-            DISK_MB, TEST_ROLE, false, EXECUTOR_SLOT.getDisk().as(Data.MB)))
+    Set<Resource> executorSet = ImmutableSet.<Resource>builder()
+        .add(mesosScalar(CPUS, ABSENT_ROLE, revocable, EXECUTOR_BAG.valueOf(CPUS)))
+        .add(mesosScalar(RAM_MB, ABSENT_ROLE, false, EXECUTOR_BAG.valueOf(RAM_MB)))
+        .add(mesosScalar(DISK_MB, TEST_ROLE, false, EXECUTOR_BAG.valueOf(DISK_MB)))
         .build();
-    assertEquals(executorList, offerAllocation.getExecutorResources());
-  }
-
-  @Test(expected = Resources.InsufficientResourcesException.class)
-  public void testMultipleRolesInsufficient() {
-    Protos.Offer offer = offer(
-        // Similar to testMultipleRoles, but make some of cpus as revocable
-        // Make cpus come from two roles.
-        mesosScalar(CPUS, TEST_ROLE, true, EXECUTOR_SLOT.getNumCpus()),
-        mesosScalar(CPUS, ABSENT_ROLE, false, TASK_SLOT.getNumCpus()),
-        // Make ram come from default role
-        mesosScalar(RAM_MB, ABSENT_ROLE, false, TOTAL_SLOT.getRam().as(Data.MB)),
-        // Make disk come from non-default role.
-        mesosScalar(DISK_MB, TEST_ROLE, false, TOTAL_SLOT.getDisk().as(Data.MB)),
-        mesosRange(PORTS, TEST_ROLE, 80),
-        mesosRange(PORTS, ABSENT_ROLE, 90));
-    // We don't have enough resource to satisfy a non-revocable request.
-    AcceptedOffer.create(
-        offer, TASK_SLOT, EXECUTOR_SLOT, TASK_PORTS_SET, new TierInfo(false, false));
+    assertEquals(executorSet, ImmutableSet.copyOf(offerAllocation.getExecutorResources()));
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/5699c959/src/test/java/org/apache/aurora/scheduler/resources/MesosResourceConverterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/resources/MesosResourceConverterTest.java b/src/test/java/org/apache/aurora/scheduler/resources/MesosResourceConverterTest.java
index d4bb5aa..a8c93f9 100644
--- a/src/test/java/org/apache/aurora/scheduler/resources/MesosResourceConverterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/resources/MesosResourceConverterTest.java
@@ -13,6 +13,13 @@
  */
 package org.apache.aurora.scheduler.resources;
 
+import java.util.List;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.mesos.Protos.Resource;
 import org.junit.Test;
 
 import static org.apache.aurora.scheduler.resources.MesosResourceConverter.RANGES;
@@ -21,6 +28,7 @@ import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
 import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar;
 import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
 import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
+import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
 import static org.junit.Assert.assertEquals;
 
 public class MesosResourceConverterTest {
@@ -43,4 +51,81 @@ public class MesosResourceConverterTest {
   public void testQuantifyRangeDefaultValue() {
     assertEquals(0, RANGES.quantify(mesosRange(PORTS)).doubleValue(), 0.0);
   }
+
+  @Test
+  public void testAllocateRange() {
+    List<Resource> expected = ImmutableList.<Resource>builder()
+        .add(mesosRange(PORTS, Optional.absent(), 80))
+        .add(mesosRange(PORTS, Optional.absent(), 90, 100))
+        .build();
+
+    Iterable<Resource> actual = RANGES.toMesosResource(
+        ImmutableSet.of(
+            mesosRange(PORTS, Optional.absent(), 79).toBuilder(),
+            mesosRange(PORTS, Optional.absent(), 80).toBuilder(),
+            mesosRange(PORTS, Optional.absent(), 80, 90, 91, 92, 100).toBuilder()),
+        () -> ImmutableSet.of(80, 90, 100),
+        false);
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testAllocateRangeRevocable() {
+    Resource.Builder builder = mesosRange(PORTS, Optional.absent(), 80).toBuilder()
+        .setRevocable(Resource.RevocableInfo.newBuilder());
+
+    List<Resource> expected = ImmutableList.<Resource>builder().add(builder.build()).build();
+
+    Iterable<Resource> actual = RANGES.toMesosResource(
+        ImmutableSet.of(builder),
+        () -> ImmutableSet.of(80),
+        true);
+    assertEquals(expected, actual);
+  }
+
+  @Test(expected = Resources.InsufficientResourcesException.class)
+  public void testAllocateRangeInsufficent() {
+    RANGES.toMesosResource(
+        ImmutableSet.of(mesosRange(PORTS, Optional.absent(), 80, 81, 90, 91, 92).toBuilder()),
+        () -> ImmutableSet.of(80, 90, 100),
+        false);
+  }
+
+  @Test
+  public void testAllocateScalar() {
+    List<Resource> expected = ImmutableList.<Resource>builder()
+        .add(mesosScalar(CPUS, 31.9999999))
+        .build();
+
+    Iterable<Resource> actual = SCALAR.toMesosResource(
+        ImmutableSet.of(
+            mesosScalar(CPUS, 0.0000001).toBuilder(),
+            mesosScalar(CPUS, 31.9999999).toBuilder(),
+            mesosScalar(CPUS, 15).toBuilder()),
+        () -> 32.0,
+        false);
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testAllocateScalarRevocable() {
+    Resource.Builder builder = mesosScalar(RAM_MB, 128.0).toBuilder()
+        .setRevocable(Resource.RevocableInfo.newBuilder());
+
+    List<Resource> expected = ImmutableList.<Resource>builder().add(builder.build()).build();
+
+    Iterable<Resource> actual = SCALAR.toMesosResource(
+        ImmutableSet.of(builder),
+        () -> 128.0,
+        true);
+    assertEquals(expected, actual);
+  }
+
+  @Test(expected = Resources.InsufficientResourcesException.class)
+  public void testAllocateScalarInsufficent() {
+    SCALAR.toMesosResource(
+        ImmutableSet.of(mesosScalar(RAM_MB, 32.0).toBuilder()),
+        () -> 128.0,
+        false);
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/5699c959/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java b/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java
index ba597b8..be7bd32 100644
--- a/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java
+++ b/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java
@@ -110,7 +110,7 @@ public final class ResourceTestUtil {
         .setRanges(Protos.Value.Ranges.newBuilder().addAllRange(
             Iterables.transform(
                 Numbers.toRanges(ImmutableSet.copyOf(values)),
-                ResourceSlot.RANGE_TRANSFORM)))
+                Numbers.RANGE_TRANSFORM)))
         .build();
   }
 


Mime
View raw message