myriad-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dar...@apache.org
Subject [2/3] incubator-myriad git commit: Refactor which addresses Myriad 213, 214, and 136 in the process. -Refactored ExecutorCommandLineGenerator classes to use this class (resolves Myriad-214 in the process). -Refactor TackFactory classes as necessary to wo
Date Fri, 12 Aug 2016 13:32:59 GMT
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskConstraints.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskConstraints.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskConstraints.java
deleted file mode 100644
index ace9928..0000000
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskConstraints.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.myriad.scheduler;
-
-/**
- * Generic interface to represent some constraints that task can impose
- * while figuring out whether to accept or reject the offer
- * We may start small and then eventually add more constraints
- */
-public interface TaskConstraints {
-
-  /**
-   * Required number of ports
-   *
-   * @return portsNumber
-   */
-  public int portsCount();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskConstraintsManager.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskConstraintsManager.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskConstraintsManager.java
deleted file mode 100644
index 0665190..0000000
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskConstraintsManager.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.myriad.scheduler;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Factory class to keep map of the constraints
- */
-public class TaskConstraintsManager {
-
-  /**
-   * Since all the additions will happen during init time, there is no need to make this map Concurrent
-   * if/when later on it will change we may need to change HashMap to Concurrent one
-   */
-  private Map<String, TaskConstraints> taskConstraintsMap = new HashMap<>();
-
-  public TaskConstraints getConstraints(String taskPrefix) {
-    return taskConstraintsMap.get(taskPrefix);
-  }
-
-  public void addTaskConstraints(final String taskPrefix, final TaskConstraints taskConstraints) {
-    if (taskConstraints != null) {
-      taskConstraintsMap.put(taskPrefix, taskConstraints);
-    }
-  }
-
-  public boolean exists(String taskPrefix) {
-    return taskConstraintsMap.containsKey(taskPrefix);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java
index 6b398a3..7e63e0d 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java
@@ -9,233 +9,151 @@
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
-package org.apache.myriad.scheduler;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Objects;
-import java.util.Random;
-import java.util.Set;
 
-import javax.inject.Inject;
+package org.apache.myriad.scheduler;
 
-import org.apache.mesos.Protos.CommandInfo;
-import org.apache.mesos.Protos.CommandInfo.URI;
-import org.apache.mesos.Protos.ExecutorID;
-import org.apache.mesos.Protos.ExecutorInfo;
-import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.Resource;
-import org.apache.mesos.Protos.TaskID;
-import org.apache.mesos.Protos.TaskInfo;
-import org.apache.mesos.Protos.Value;
-import org.apache.mesos.Protos.Value.Range;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import org.apache.mesos.Protos;
 import org.apache.myriad.configuration.MyriadConfiguration;
-import org.apache.myriad.configuration.MyriadExecutorConfiguration;
+import org.apache.myriad.configuration.MyriadContainerConfiguration;
+import org.apache.myriad.configuration.MyriadDockerConfiguration;
+import org.apache.myriad.scheduler.resource.ResourceOfferContainer;
 import org.apache.myriad.state.NodeTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * Creates Tasks based upon Mesos offers
+ * Base class to create Tasks based upon Mesos offers
  */
-public interface TaskFactory {
+public abstract class TaskFactory {
+  public static final String EXECUTOR_NAME = "myriad_task";
+  public static final String EXECUTOR_PREFIX = "myriad_executor";
+
+  protected static final Logger LOGGER = LoggerFactory.getLogger(TaskFactory.class);
+
   static final String YARN_RESOURCEMANAGER_HOSTNAME = "yarn.resourcemanager.hostname";
   static final String YARN_RESOURCEMANAGER_WEBAPP_ADDRESS = "yarn.resourcemanager.webapp.address";
   static final String YARN_RESOURCEMANAGER_WEBAPP_HTTPS_ADDRESS = "yarn.resourcemanager.webapp.https.address";
   static final String YARN_HTTP_POLICY = "yarn.http.policy";
   static final String YARN_HTTP_POLICY_HTTPS_ONLY = "HTTPS_ONLY";
 
-  TaskInfo createTask(Offer offer, FrameworkID frameworkId, TaskID taskId, NodeTask nodeTask);
+  private static final String CONTAINER_PATH_KEY = "containerPath";
+  private static final String HOST_PATH_KEY = "hostPath";
+  private static final String RW_MODE = "mode";
+  private static final String PARAMETER_KEY_KEY = "key";
+  private static final String PARAMETER_VALUE_KEY = "value";
 
-  // TODO(Santosh): This is needed because the ExecutorInfo constructed
-  // to launch NM needs to be specified to launch placeholder tasks for
-  // yarn containers (for fine grained scaling).
-  // If mesos supports just specifying the 'ExecutorId' without the full
-  // ExecutorInfo, we wouldn't need this interface method.
-  ExecutorInfo getExecutorInfoForSlave(FrameworkID frameworkId, Offer offer, CommandInfo commandInfo);
+  protected MyriadConfiguration cfg;
+  protected TaskUtils taskUtils;
+  protected ExecutorCommandLineGenerator clGenerator;
 
-  /**
-   * Creates TaskInfo objects to launch NMs as mesos tasks.
-   */
-  class NMTaskFactoryImpl implements TaskFactory {
-    public static final String EXECUTOR_NAME = "myriad_task";
-    public static final String EXECUTOR_PREFIX = "myriad_executor";
-    public static final String YARN_NODEMANAGER_OPTS_KEY = "YARN_NODEMANAGER_OPTS";
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(NMTaskFactoryImpl.class);
-    private static final Random rand = new Random();
-    private MyriadConfiguration cfg;
-    private TaskUtils taskUtils;
-    private ExecutorCommandLineGenerator clGenerator;
-
-    @Inject
-    public NMTaskFactoryImpl(MyriadConfiguration cfg, TaskUtils taskUtils, ExecutorCommandLineGenerator clGenerator) {
-      this.cfg = cfg;
-      this.taskUtils = taskUtils;
-      this.clGenerator = clGenerator;
-    }
+  public TaskFactory() {
 
-    @VisibleForTesting
-    protected static HashSet<Long> getNMPorts(Resource resource) {
-      HashSet<Long> ports = new HashSet<>();
-      if (resource.getName().equals("ports")) {
-        /*
-        ranges.getRangeList() returns a list of ranges, each range specifies a begin and end only.
-        so must loop though each range until we get all ports needed.  We exit each loop as soon as all
-        ports are found so bounded by NMPorts.expectedNumPorts.
-        */
-        final List<Range> ranges = resource.getRanges().getRangeList();
-        final List<Long> allAvailablePorts = new ArrayList<>();
-        for (Range range : ranges) {
-          if (range.hasBegin() && range.hasEnd()) {
-            for (long i = range.getBegin(); i <= range.getEnd(); i++) {
-              allAvailablePorts.add(i);
-            }
-          }
-        }
+  }
 
-        Preconditions.checkState(allAvailablePorts.size() >= NMPorts.expectedNumPorts(), "Not enough ports in offer");
+  @Inject
+  public TaskFactory(MyriadConfiguration cfg, TaskUtils taskUtils, ExecutorCommandLineGenerator clGenerator) {
+    this.cfg = cfg;
+    this.taskUtils = taskUtils;
+    this.clGenerator = clGenerator;
+  }
 
-        while (ports.size() < NMPorts.expectedNumPorts()) {
-          int portIndex = rand.nextInt(allAvailablePorts.size());
-          ports.add(allAvailablePorts.get(portIndex));
-          allAvailablePorts.remove(portIndex);
-        }
-      }
-      return ports;
-    }
+  public abstract Protos.TaskInfo createTask(ResourceOfferContainer resourceOfferContainer, Protos.FrameworkID frameworkId,
+                                             Protos.TaskID taskId, NodeTask nodeTask);
 
-    //Utility function to get the first NMPorts.expectedNumPorts number of ports of an offer
-    @VisibleForTesting
-    protected static NMPorts getPorts(Offer offer) {
-      Set<Long> ports = new HashSet<>();
-      for (Resource resource : offer.getResourcesList()) {
-        if (resource.getName().equals("ports") && (!resource.hasRole() || resource.getRole().equals("*"))) {
-          ports = getNMPorts(resource);
-          break;
+  // TODO(Santosh): This is needed because the ExecutorInfo constructed
+  // to launch NM needs to be specified to launch placeholder tasks for
+  // yarn containers (for fine grained scaling).
+  // If mesos supports just specifying the 'ExecutorId' without the full
+  // ExecutorInfo, we wouldn't need this interface method.
+  public abstract Protos.ExecutorInfo getExecutorInfoForSlave(ResourceOfferContainer resourceOfferContainer, Protos.FrameworkID frameworkId, Protos.CommandInfo commandInfo);
+
+  protected Iterable<Protos.Volume> getVolumes(Iterable<Map<String, String>> volume) {
+    return Iterables.transform(volume, new Function<Map<String, String>, Protos.Volume>() {
+      @Nullable
+      @Override
+      public Protos.Volume apply(Map<String, String> map) {
+        Preconditions.checkArgument(map.containsKey(HOST_PATH_KEY) && map.containsKey(CONTAINER_PATH_KEY));
+        Protos.Volume.Mode mode = Protos.Volume.Mode.RO;
+        if (map.containsKey(RW_MODE) && map.get(RW_MODE).toLowerCase().equals("rw")) {
+          mode = Protos.Volume.Mode.RW;
         }
+        return Protos.Volume.newBuilder()
+            .setContainerPath(map.get(CONTAINER_PATH_KEY))
+            .setHostPath(map.get(HOST_PATH_KEY))
+            .setMode(mode)
+            .build();
       }
+    });
+  }
 
-      Long [] portArray = ports.toArray(new Long [ports.size()]);
-      return new NMPorts(portArray);
-    }
-
-    @VisibleForTesting
-    CommandInfo getCommandInfo(ServiceResourceProfile profile, NMPorts ports) {
-      MyriadExecutorConfiguration myriadExecutorConfiguration = cfg.getMyriadExecutorConfiguration();
-      CommandInfo.Builder commandInfo = CommandInfo.newBuilder();
-      String cmd;
-
-      if (myriadExecutorConfiguration.getJvmUri().isPresent()) {
-        final String jvmRemoteUri = myriadExecutorConfiguration.getJvmUri().get();
-        LOGGER.info("Getting JRE distribution from:" + jvmRemoteUri);
-        URI jvmUri = URI.newBuilder().setValue(jvmRemoteUri).setExtract(true).build();
-        commandInfo.addUris(jvmUri);
-      }
-
-      if (myriadExecutorConfiguration.getConfigUri().isPresent()) {
-        String configURI = myriadExecutorConfiguration.getConfigUri().get();
-        LOGGER.info("Getting Hadoop distribution from: {}", configURI);
-        commandInfo.addUris(URI.newBuilder().setValue(configURI).build());
-      }
-
-      if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) {
-        //Both FrameworkUser and FrameworkSuperuser to get all of the directory permissions correct.
-        if (!(cfg.getFrameworkUser().isPresent() && cfg.getFrameworkSuperUser().isPresent())) {
-          throw new RuntimeException("Trying to use remote distribution, but frameworkUser" + "and/or frameworkSuperUser not set!");
-        }
-        String nodeManagerUri = myriadExecutorConfiguration.getNodeManagerUri().get();
-        cmd = clGenerator.generateCommandLine(profile, ports);
-
-        //get the nodemanagerURI
-        //We're going to extract ourselves, so setExtract is false
-        LOGGER.info("Getting Hadoop distribution from: {}", nodeManagerUri);
-        URI nmUri = URI.newBuilder().setValue(nodeManagerUri).setExtract(false).build();
-
-        //get configs directly from resource manager
-        String configUrlString = clGenerator.getConfigurationUrl();
-        LOGGER.info("Getting config from:" + configUrlString);
-        URI configUri = URI.newBuilder().setValue(configUrlString).build();
-        LOGGER.info("Slave will execute command: {}",  cmd);
-        commandInfo.addUris(nmUri).addUris(configUri).setValue("echo \"" + cmd + "\";" + cmd);
-        commandInfo.setUser(cfg.getFrameworkSuperUser().get());
-
-      } else {
-        cmd = clGenerator.generateCommandLine(profile, ports);
-        commandInfo.setValue("echo \"" + cmd + "\";" + cmd);
-
-        if (cfg.getFrameworkUser().isPresent()) {
-          commandInfo.setUser(cfg.getFrameworkUser().get());
-        }
+  protected Iterable<Protos.Parameter> getParameters(Iterable<Map<String, String>> params) {
+    Preconditions.checkNotNull(params);
+    return Iterables.transform(params, new Function<Map<String, String>, Protos.Parameter>() {
+      @Override
+      public Protos.Parameter apply(Map<String, String> parameter) {
+        Preconditions.checkNotNull(parameter, "Null parameter");
+        Preconditions.checkState(parameter.containsKey(PARAMETER_KEY_KEY), "Missing key");
+        Preconditions.checkState(parameter.containsKey(PARAMETER_VALUE_KEY), "Missing value");
+        return Protos.Parameter.newBuilder()
+            .setKey(parameter.get(PARAMETER_KEY_KEY))
+            .setValue(PARAMETER_VALUE_KEY)
+            .build();
       }
-      return commandInfo.build();
-    }
+    });
+  }
 
-    @Override
-    public TaskInfo createTask(Offer offer, FrameworkID frameworkId, TaskID taskId, NodeTask nodeTask) {
-      Objects.requireNonNull(offer, "Offer should be non-null");
-      Objects.requireNonNull(nodeTask, "NodeTask should be non-null");
-
-      NMPorts ports = getPorts(offer);
-      LOGGER.debug(ports.toString());
-
-      ServiceResourceProfile serviceProfile = nodeTask.getProfile();
-      Double taskMemory = serviceProfile.getAggregateMemory();
-      Double taskCpus = serviceProfile.getAggregateCpu();
-
-      CommandInfo commandInfo = getCommandInfo(serviceProfile, ports);
-      ExecutorInfo executorInfo = getExecutorInfoForSlave(frameworkId, offer, commandInfo);
-
-      TaskInfo.Builder taskBuilder = TaskInfo.newBuilder().setName(cfg.getFrameworkName() + "-" + taskId.getValue()).setTaskId(taskId).setSlaveId(
-          offer.getSlaveId());
-
-      return taskBuilder
-          .addAllResources(taskUtils.getScalarResource(offer, "cpus", taskCpus, taskUtils.getExecutorCpus()))
-          .addAllResources(taskUtils.getScalarResource(offer, "mem", taskMemory, taskUtils.getExecutorMemory()))
-          .addResources(Resource.newBuilder().setName("ports").setType(Value.Type.RANGES).setRanges(Value.Ranges.newBuilder()
-              .addRange(Range.newBuilder().setBegin(ports.getRpcPort()).setEnd(ports.getRpcPort()).build())
-              .addRange(Range.newBuilder().setBegin(ports.getLocalizerPort()).setEnd(ports.getLocalizerPort()).build())
-              .addRange(Range.newBuilder().setBegin(ports.getWebAppHttpPort()).setEnd(ports.getWebAppHttpPort()).build())
-              .addRange(Range.newBuilder().setBegin(ports.getShufflePort()).setEnd(ports.getShufflePort()).build())))
-          .setExecutor(executorInfo)
-          .build();
-    }
+  protected Protos.ContainerInfo.DockerInfo getDockerInfo(MyriadDockerConfiguration dockerConfiguration) {
+    Preconditions.checkArgument(dockerConfiguration.getNetwork().equals("HOST"), "Currently only host networking supported");
+    Protos.ContainerInfo.DockerInfo.Builder dockerBuilder = Protos.ContainerInfo.DockerInfo.newBuilder()
+        .setImage(dockerConfiguration.getImage())
+        .setForcePullImage(dockerConfiguration.getForcePullImage())
+        .setNetwork(Protos.ContainerInfo.DockerInfo.Network.valueOf(dockerConfiguration.getNetwork()))
+        .setPrivileged(dockerConfiguration.getPrivledged())
+        .addAllParameters(getParameters(dockerConfiguration.getParameters()));
+    return dockerBuilder.build();
+  }
 
-    @Override
-    public ExecutorInfo getExecutorInfoForSlave(FrameworkID frameworkId, Offer offer, CommandInfo commandInfo) {
-      ExecutorID executorId = ExecutorID.newBuilder()
-          .setValue(EXECUTOR_PREFIX + frameworkId.getValue() + offer.getId().getValue() + offer.getSlaveId().getValue())
-          .build();
-      ExecutorInfo.Builder executorInfo = ExecutorInfo.newBuilder().setCommand(commandInfo).setName(EXECUTOR_NAME).setExecutorId(executorId)
-              .addAllResources(taskUtils.getScalarResource(offer, "cpus", taskUtils.getExecutorCpus(), 0.0))
-              .addAllResources(taskUtils.getScalarResource(offer, "mem", taskUtils.getExecutorMemory(), 0.0));
-      if (cfg.getContainerInfo().isPresent()) {
-        executorInfo.setContainer(taskUtils.getContainerInfo());
-      }
-      return executorInfo.build();
+  /**
+   * Builds a ContainerInfo Object
+   *
+   * @return ContainerInfo
+   */
+  protected Protos.ContainerInfo getContainerInfo() {
+    Preconditions.checkArgument(cfg.getContainerInfo().isPresent(), "ContainerConfiguration doesn't exist!");
+    MyriadContainerConfiguration containerConfiguration = cfg.getContainerInfo().get();
+    Protos.ContainerInfo.Builder containerBuilder = Protos.ContainerInfo.newBuilder()
+        .setType(Protos.ContainerInfo.Type.valueOf(containerConfiguration.getType()))
+        .addAllVolumes(getVolumes(containerConfiguration.getVolumes()));
+    if (containerConfiguration.getDockerInfo().isPresent()) {
+      MyriadDockerConfiguration dockerConfiguration = containerConfiguration.getDockerInfo().get();
+      containerBuilder.setDocker(getDockerInfo(dockerConfiguration));
     }
+    return containerBuilder.build();
   }
 
   /**
-   * Implement NM Task Constraints
+   * Simple helper to convert Mesos Range Resource to a list of longs.
    */
-  public static class NMTaskConstraints implements TaskConstraints {
-
-    @Override
-    public int portsCount() {
-      return NMPorts.expectedNumPorts();
+  protected List<Long> rangesConverter(List<Protos.Resource> rangeResources) {
+    List<Long> ret = new ArrayList();
+    for (Protos.Resource range : rangeResources) {
+      ret.add(range.getRanges().getRange(0).getBegin());
     }
+    return ret;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java
index 6ab6ee0..4bd60bc 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java
@@ -18,18 +18,11 @@
  */
 package org.apache.myriad.scheduler;
 
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
 import org.apache.mesos.Protos;
 import org.apache.myriad.configuration.MyriadConfiguration;
-import org.apache.myriad.configuration.MyriadContainerConfiguration;
-import org.apache.myriad.configuration.MyriadDockerConfiguration;
 import org.apache.myriad.executor.MyriadExecutorDefaults;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
 import javax.inject.Inject;
 import java.util.ArrayList;
 import java.util.List;
@@ -40,11 +33,7 @@ import java.util.Map;
  */
 public class TaskUtils {
   private static final Logger LOGGER = LoggerFactory.getLogger(TaskUtils.class);
-  private static final String CONTAINER_PATH_KEY = "containerPath";
-  private static final String HOST_PATH_KEY = "hostPath";
-  private static final String RW_MODE = "mode";
-  private static final String PARAMETER_KEY_KEY = "key";
-  private static final String PARAMETER_VALUE_KEY = "value";
+
 
   private MyriadConfiguration cfg;
 
@@ -53,6 +42,7 @@ public class TaskUtils {
     this.cfg = cfg;
   }
 
+
   public double getNodeManagerMemory() {
     return cfg.getNodeManagerConfiguration().getJvmMaxMemoryMB();
   }
@@ -61,8 +51,11 @@ public class TaskUtils {
     return cfg.getNodeManagerConfiguration().getCpus();
   }
 
-  public double getExecutorCpus() {
+  public Map<String, Long> getNodeManagerPorts() {
+    return cfg.getNodeManagerConfiguration().getPorts();
+  }
 
+  public double getExecutorCpus() {
     return MyriadExecutorDefaults.DEFAULT_CPUS;
   }
 
@@ -74,71 +67,6 @@ public class TaskUtils {
     super();
   }
 
-  public Iterable<Protos.Volume> getVolumes(Iterable<Map<String, String>> volume) {
-    return Iterables.transform(volume, new Function<Map<String, String>, Protos.Volume>() {
-      @Nullable
-      @Override
-      public Protos.Volume apply(Map<String, String> map) {
-        Preconditions.checkArgument(map.containsKey(HOST_PATH_KEY) && map.containsKey(CONTAINER_PATH_KEY));
-        Protos.Volume.Mode mode = Protos.Volume.Mode.RO;
-        if (map.containsKey(RW_MODE) && map.get(RW_MODE).toLowerCase().equals("rw")) {
-          mode = Protos.Volume.Mode.RW;
-        }
-        return Protos.Volume.newBuilder()
-            .setContainerPath(map.get(CONTAINER_PATH_KEY))
-            .setHostPath(map.get(HOST_PATH_KEY))
-            .setMode(mode)
-            .build();
-      }
-    });
-  }
-
-  public Iterable<Protos.Parameter> getParameters(Iterable<Map<String, String>> params) {
-    Preconditions.checkNotNull(params);
-    return Iterables.transform(params, new Function<Map<String, String>, Protos.Parameter>() {
-      @Override
-      public Protos.Parameter apply(Map<String, String> parameter) {
-        Preconditions.checkNotNull(parameter, "Null parameter");
-        Preconditions.checkState(parameter.containsKey(PARAMETER_KEY_KEY), "Missing key");
-        Preconditions.checkState(parameter.containsKey(PARAMETER_VALUE_KEY), "Missing value");
-        return Protos.Parameter.newBuilder()
-            .setKey(parameter.get(PARAMETER_KEY_KEY))
-            .setValue(PARAMETER_VALUE_KEY)
-            .build();
-      }
-    });
-  }
-
-  private Protos.ContainerInfo.DockerInfo getDockerInfo(MyriadDockerConfiguration dockerConfiguration) {
-    Preconditions.checkArgument(dockerConfiguration.getNetwork().equals("HOST"), "Currently only host networking supported");
-    Protos.ContainerInfo.DockerInfo.Builder dockerBuilder = Protos.ContainerInfo.DockerInfo.newBuilder()
-        .setImage(dockerConfiguration.getImage())
-        .setForcePullImage(dockerConfiguration.getForcePullImage())
-        .setNetwork(Protos.ContainerInfo.DockerInfo.Network.valueOf(dockerConfiguration.getNetwork()))
-        .setPrivileged(dockerConfiguration.getPrivledged())
-        .addAllParameters(getParameters(dockerConfiguration.getParameters()));
-    return dockerBuilder.build();
-  }
-
-  /**
-   * Builds a ContainerInfo Object
-   *
-   * @return ContainerInfo
-   */
-  public Protos.ContainerInfo getContainerInfo() {
-    Preconditions.checkArgument(cfg.getContainerInfo().isPresent(), "ContainerConfiguration doesn't exist!");
-    MyriadContainerConfiguration containerConfiguration = cfg.getContainerInfo().get();
-    Protos.ContainerInfo.Builder containerBuilder = Protos.ContainerInfo.newBuilder()
-        .setType(Protos.ContainerInfo.Type.valueOf(containerConfiguration.getType()))
-        .addAllVolumes(getVolumes(containerConfiguration.getVolumes()));
-    if (containerConfiguration.getDockerInfo().isPresent()) {
-      MyriadDockerConfiguration dockerConfiguration = containerConfiguration.getDockerInfo().get();
-      containerBuilder.setDocker(getDockerInfo(dockerConfiguration));
-    }
-    return containerBuilder.build();
-  }
-
-
   /**
    * Helper function that returns all scalar resources of a given name in an offer up to a given value.  Attempts to
    * take resource from the prescribed role first and then from the default role.  The variable used indicated any
@@ -152,7 +80,7 @@ public class TaskUtils {
    */
   public Iterable<Protos.Resource> getScalarResource(Protos.Offer offer, String name, Double value, Double used) {
     String role = cfg.getFrameworkRole();
-    List<Protos.Resource> resources = new ArrayList<Protos.Resource>();
+    List<Protos.Resource> resources = new ArrayList<>();
 
     double resourceDifference = 0; //used to determine the resource difference of value and the resources requested from role *
     //Find role by name, must loop through resources

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
index f0e80e9..c65ad4a 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
@@ -20,34 +20,24 @@ package org.apache.myriad.scheduler.event.handlers;
 
 import com.google.common.collect.Sets;
 import com.lmax.disruptor.EventHandler;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+
+import java.util.*;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import javax.inject.Inject;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.Resource;
 import org.apache.mesos.Protos.TaskInfo;
-import org.apache.mesos.Protos.Value;
 import org.apache.mesos.SchedulerDriver;
+import org.apache.myriad.configuration.MyriadConfiguration;
 import org.apache.myriad.scheduler.SchedulerUtils;
 import org.apache.myriad.scheduler.ServiceResourceProfile;
-import org.apache.myriad.scheduler.TaskConstraints;
-import org.apache.myriad.scheduler.TaskConstraintsManager;
 import org.apache.myriad.scheduler.TaskFactory;
-import org.apache.myriad.scheduler.TaskUtils;
 import org.apache.myriad.scheduler.constraints.Constraint;
-import org.apache.myriad.scheduler.constraints.LikeConstraint;
 import org.apache.myriad.scheduler.event.ResourceOffersEvent;
 import org.apache.myriad.scheduler.fgs.OfferLifecycleManager;
+import org.apache.myriad.scheduler.resource.ResourceOfferContainer;
 import org.apache.myriad.state.NodeTask;
 import org.apache.myriad.state.SchedulerState;
 import org.slf4j.Logger;
@@ -61,26 +51,20 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv
 
   private static final Lock driverOperationLock = new ReentrantLock();
 
-  private static final String RESOURCES_CPU_KEY = "cpus";
-  private static final String RESOURCES_MEM_KEY = "mem";
-  private static final String RESOURCES_PORTS_KEY = "ports";
-  private static final String RESOURCES_DISK_KEY = "disk";
-
-
-  @Inject
   private SchedulerState schedulerState;
-
-  @Inject
-  private TaskUtils taskUtils;
-
-  @Inject
+  private MyriadConfiguration cfg;
   private Map<String, TaskFactory> taskFactoryMap;
-
-  @Inject
   private OfferLifecycleManager offerLifecycleMgr;
+  private String role;
 
   @Inject
-  private TaskConstraintsManager taskConstraintsManager;
+  public ResourceOffersEventHandler(SchedulerState schedulerState, MyriadConfiguration cfg, Map<String, TaskFactory> taskFactoryMap, OfferLifecycleManager offerLifecycleManager) {
+    this.schedulerState = schedulerState;
+    this.cfg = cfg;
+    this.taskFactoryMap = taskFactoryMap;
+    this.offerLifecycleMgr = offerLifecycleManager;
+    this.role = cfg.getFrameworkRole();
+  }
 
   @Override
   public void onEvent(ResourceOffersEvent event, long sequence, boolean endOfBatch) throws Exception {
@@ -128,19 +112,16 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv
             launchedTasks.addAll(schedulerState.getActiveTasksByType(taskPrefix));
             launchedTasks.addAll(schedulerState.getStagingTasksByType(taskPrefix));
 
-            if (matches(offer, taskToLaunch, constraint) && SchedulerUtils.isUniqueHostname(offer, taskToLaunch, launchedTasks)) {
+            ResourceOfferContainer resourceOfferContainer = new ResourceOfferContainer(offer, taskToLaunch.getProfile(), role);
+            if (SchedulerUtils.isUniqueHostname(offer, taskToLaunch, launchedTasks)
+                && resourceOfferContainer.satisfies(taskToLaunch.getProfile(), constraint)) {
               try {
-                final TaskInfo task = taskFactoryMap.get(taskPrefix).createTask(offer, schedulerState.getFrameworkID().get(),
-                    pendingTaskId, taskToLaunch);
-                List<OfferID> offerIds = new ArrayList<>();
-                offerIds.add(offer.getId());
-                List<TaskInfo> tasks = new ArrayList<>();
-                tasks.add(task);
+                final TaskInfo task = taskFactoryMap.get(taskPrefix).createTask(resourceOfferContainer,
+                    schedulerState.getFrameworkID().get(), pendingTaskId, taskToLaunch);
                 LOGGER.info("Launching task: {} using offer: {}", task.getTaskId().getValue(), offer.getId());
                 LOGGER.debug("Launching task: {} with profile: {} using offer: {}", task, profile, offer);
-                driver.launchTasks(offerIds, tasks);
+                driver.launchTasks(Collections.singleton(offer.getId()), Collections.singleton(task));
                 schedulerState.makeTaskStaging(pendingTaskId);
-
                 // For every NM Task that we launch, we currently
                 // need to backup the ExecutorInfo for that NM Task in the State Store.
                 // Without this, we will not be able to launch tasks corresponding to yarn
@@ -179,122 +160,4 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv
       driverOperationLock.unlock();
     }
   }
-
-  private boolean matches(Offer offer, NodeTask taskToLaunch, Constraint constraint) {
-    if (!meetsConstraint(offer, constraint)) {
-      return false;
-    }
-    Map<String, Object> results = new HashMap<String, Object>(5);
-    //Assign default values to avoid NPE
-    results.put(RESOURCES_CPU_KEY, Double.valueOf(0.0));
-    results.put(RESOURCES_MEM_KEY, Double.valueOf(0.0));
-    results.put(RESOURCES_DISK_KEY, Double.valueOf(0.0));
-    results.put(RESOURCES_PORTS_KEY, Integer.valueOf(0));
-
-    for (Resource resource : offer.getResourcesList()) {
-      if (resourceEvaluators.containsKey(resource.getName())) {
-        resourceEvaluators.get(resource.getName()).eval(resource, results);
-      } else {
-        LOGGER.warn("Ignoring unknown resource type: {}", resource.getName());
-      }
-    }
-    double cpus = (Double) results.get(RESOURCES_CPU_KEY);
-    double mem = (Double) results.get(RESOURCES_MEM_KEY);
-    int ports = (Integer) results.get(RESOURCES_PORTS_KEY);
-
-    checkResource(cpus <= 0, RESOURCES_CPU_KEY);
-    checkResource(mem <= 0, RESOURCES_MEM_KEY);
-    checkResource(ports <= 0, RESOURCES_PORTS_KEY);
-
-    return checkAggregates(offer, taskToLaunch, ports, cpus, mem);
-  }
-
-  private boolean checkAggregates(Offer offer, NodeTask taskToLaunch, int ports, double cpus, double mem) {
-    final ServiceResourceProfile profile = taskToLaunch.getProfile();
-    final String taskPrefix = taskToLaunch.getTaskPrefix();
-    final double aggrCpu = profile.getAggregateCpu() + profile.getExecutorCpu();
-    final double aggrMem = profile.getAggregateMemory() + profile.getExecutorMemory();
-    final TaskConstraints taskConstraints = taskConstraintsManager.getConstraints(taskPrefix);
-    if (aggrCpu <= cpus && aggrMem <= mem && taskConstraints.portsCount() <= ports) {
-      return true;
-    } else {
-      LOGGER.debug("Offer insufficient for task with, cpu: {}, memory: {}, ports: {}", aggrCpu, aggrMem, ports);
-      return false;
-    }
-  }
-
-  private boolean meetsConstraint(Offer offer, Constraint constraint) {
-    if (constraint != null) {
-      switch (constraint.getType()) {
-        case LIKE: {
-          LikeConstraint likeConstraint = (LikeConstraint) constraint;
-          if (likeConstraint.isConstraintOnHostName()) {
-            return likeConstraint.matchesHostName(offer.getHostname());
-          } else {
-            return likeConstraint.matchesSlaveAttributes(offer.getAttributesList());
-          }
-        }
-        default:
-          return false;
-      }
-    }
-    return true;
-  }
-
-  private void checkResource(boolean fail, String resource) {
-    if (fail) {
-      LOGGER.debug("No " + resource + " resources present");
-    }
-  }
-
-  private static Double scalarToDouble(Resource resource, String id) {
-    Double value = new Double(0.0);
-    if (resource.getType().equals(Value.Type.SCALAR)) {
-      value = new Double(resource.getScalar().getValue());
-    } else {
-      LOGGER.error(id + " resource was not a scalar: {}", resource.getType().toString());
-    }
-    return value;
-  }
-
-  private interface EvalResources {
-    public void eval(Resource resource, Map<String, Object> results);
-  }
-
-  private static Map<String, EvalResources> resourceEvaluators;
-
-  static {
-    resourceEvaluators = new HashMap<String, EvalResources>(4);
-    resourceEvaluators.put(RESOURCES_CPU_KEY, new EvalResources() {
-      public void eval(Resource resource, Map<String, Object> results) {
-        results.put(RESOURCES_CPU_KEY, (Double) results.get(RESOURCES_CPU_KEY) + scalarToDouble(resource, RESOURCES_CPU_KEY));
-      }
-    });
-    resourceEvaluators.put(RESOURCES_MEM_KEY, new EvalResources() {
-      public void eval(Resource resource, Map<String, Object> results) {
-        results.put(RESOURCES_MEM_KEY, (Double) results.get(RESOURCES_MEM_KEY) + scalarToDouble(resource, RESOURCES_MEM_KEY));
-      }
-    });
-    resourceEvaluators.put(RESOURCES_DISK_KEY, new EvalResources() {
-      public void eval(Resource resource, Map<String, Object> results) {
-      }
-    });
-    resourceEvaluators.put(RESOURCES_PORTS_KEY, new EvalResources() {
-      public void eval(Resource resource, Map<String, Object> results) {
-        int ports = 0;
-        if (resource.getType().equals(Value.Type.RANGES)) {
-          Value.Ranges ranges = resource.getRanges();
-          for (Value.Range range : ranges.getRangeList()) {
-            if (range.getBegin() < range.getEnd()) {
-              ports += range.getEnd() - range.getBegin() + 1;
-            }
-          }
-        } else {
-          LOGGER.error("ports resource was not Ranges: {}", resource.getType().toString());
-
-        }
-        results.put(RESOURCES_PORTS_KEY, (Integer) results.get(RESOURCES_PORTS_KEY) + Integer.valueOf(ports));
-      }
-    });
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/resource/RangeResource.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/resource/RangeResource.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/resource/RangeResource.java
new file mode 100644
index 0000000..45258f0
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/resource/RangeResource.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler.resource;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.mesos.Protos;
+
+import java.util.*;
+
+/**
+ * Mutable POJO for handling RangeResources, specifically ports.
+ */
+
+public class RangeResource {
+  private String name;
+  private List<Range> ranges = new ArrayList<>();
+  private Long numValues = 0L;
+  private Long numDefaultValues = 0L;
+
+  private String role;
+  @VisibleForTesting //This way we can set a seed to get deterministic values
+  private Random random = new Random(102);
+
+  public RangeResource(String name, String role) {
+    this.name = name;
+    this.role = role;
+  }
+
+  public boolean satisfies(Collection<Long> requestedValues) {
+    if (requestedValues.size() > numValues) {
+      return false;
+    }
+    List<Long> tmp = new ArrayList<>();
+    tmp.addAll(requestedValues);
+    tmp.removeAll(Collections.singleton(0L));
+    for (Long val : tmp) {
+      if (!contains(val)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public boolean contains(Long value) {
+    for (Range range: ranges) {
+      if (range.contains(value)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public List<Long> getValues() {
+    List<Long> ret = new ArrayList<>();
+    for (Range range: ranges) {
+      ret.addAll(range.allValues());
+    }
+    return ret;
+  }
+
+  public void addRanges(List <Protos.Value.Range> ranges, Boolean withRole) {
+    for (Protos.Value.Range range : ranges) {
+      long tb = range.getBegin();
+      long te = range.getEnd();
+      this.ranges.add(new Range(tb, te, withRole));
+      numValues += (te - tb + 1);
+      if (!withRole) {
+        numDefaultValues += (te - tb + 1);
+      }
+    }
+  }
+
+  public List<Protos.Resource> consumeResource(Collection<Long> requestedValues) {
+    Preconditions.checkState(satisfies(requestedValues));
+    List<Protos.Resource> resources = new ArrayList<>();
+    List<Long> nonZeros = new ArrayList<>();
+    nonZeros.addAll(requestedValues);
+    nonZeros.removeAll(Collections.singleton(0L));
+    for (Long value : nonZeros) {
+      resources.add(createResource(value, hasRole(value)));
+    }
+    List<Long> randomValues = getRandomValues(requestedValues.size() - nonZeros.size());
+    for (Long value: randomValues) {
+      resources.add(createResource(value, false));
+    }
+    return resources;
+  }
+
+  private Protos.Resource createResource(Long value, Boolean withRole) {
+    Preconditions.checkState(removeValue(value), "Value " + value + " doesn't exist");
+    Protos.Resource.Builder builder = Protos.Resource.newBuilder()
+        .setName(name)
+        .setType(Protos.Value.Type.RANGES)
+        .setRanges(Protos.Value.Ranges.newBuilder()
+            .addRange(Protos.Value.Range.newBuilder()
+                .setBegin(value)
+                .setEnd(value)
+                .build()
+            )
+        );
+    if (withRole) {
+      builder.setRole(role);
+    }
+    return builder.build();
+  }
+
+  private List<Long> getRandomValues(int size) {
+    //can improve this
+    List<Integer> sample = new ArrayList<>(size);
+    while (sample.size() < size) {
+      int rand = random.nextInt(numDefaultValues.intValue());
+      if (!sample.contains(rand)) {
+        sample.add(rand);
+      }
+    }
+    Collections.sort(sample);
+
+    long location = 0;
+    long lastLocation = 0;
+    int j = 0;
+    List<Long> elems = new ArrayList<>();
+    for (Range range : ranges) {
+      if (!range.role) {
+        long tb = range.begin;
+        long te = range.end;
+        location += te - tb + 1;
+        for (int i = j; i < sample.size(); i++) {
+          long val = sample.get(i);
+          if (val < location) {
+            elems.add(tb + val - lastLocation);
+            j++;
+          } else {
+            lastLocation = location;
+            break;
+          }
+        }
+      }
+    }
+    return elems;
+  }
+
+  private boolean removeValue(Long value) {
+    for (Range range : ranges) {
+      if (range.contains(value)) {
+        ranges.remove(range);
+        long begin = range.begin;
+        long end = range.end;
+        if (value != begin && value != end) {
+          ranges.add(new Range(begin, value - 1, range.role));
+          ranges.add(new Range(value + 1, end, range.role));
+          return true;
+        } else if (value == begin && value != end) {
+          ranges.add(new Range(value + 1, end, range.role));
+          return true;
+        } else if (value == end && value != begin) {
+          ranges.add(new Range(begin, value - 1, range.role));
+          return true;
+        } else {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  private boolean hasRole(Long value) {
+    for (Range range : ranges) {
+      if (range.contains(value)) {
+        return range.role;
+      }
+    }
+    return false;
+  }
+
+  private static class Range {
+    Long begin;
+    Long end;
+    Boolean role;
+
+    public Range(Long begin, Long end, Boolean role){
+      this.begin = begin;
+      this.end = end;
+      this.role = role;
+    }
+    public Collection<Long> allValues() {
+      List<Long> ret = new ArrayList<>();
+      for (long i = begin; i <= end; i++) {
+        ret.add(i);
+      }
+      return ret;
+    }
+    public Boolean contains(Long value) {
+      return (value >= begin && value <= end);
+    }
+
+    public String toString() {
+      return "(" + begin + "," + end + ")";
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/resource/ResourceOfferContainer.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/resource/ResourceOfferContainer.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/resource/ResourceOfferContainer.java
new file mode 100644
index 0000000..13efbc2
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/resource/ResourceOfferContainer.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler.resource;
+
+import com.google.common.base.Preconditions;
+import org.apache.mesos.Protos;
+import org.apache.myriad.scheduler.ServiceResourceProfile;
+import org.apache.myriad.scheduler.constraints.Constraint;
+import org.apache.myriad.scheduler.constraints.LikeConstraint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * Container class to get and keep track of mesos resources
+ */
+public class ResourceOfferContainer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ResourceOfferContainer.class);
+  private static final String RESOURCE_CPUS = "cpus";
+  private static final String RESOURCE_MEM = "mem";
+  private static final String RESOURCE_PORTS = "ports";
+
+  private HashMap<String, ScalarResource> scalarValues = new HashMap<>();
+  private HashMap<String, RangeResource> rangeValues = new HashMap<>();
+
+  private Protos.Offer offer;
+  private String role;
+
+  /**
+   * Constructor takes an offer and profile and constructs a mutable POJO to handle resource offers.
+   *
+   * @param offer   Mesos.Protos.Offer
+   * @param profile ServiceResourceProfile
+   */
+  public ResourceOfferContainer(Protos.Offer offer, ServiceResourceProfile profile, String role) {
+    this.offer = offer;
+    this.role = role;
+    setScalarValues();
+    //ports = new RangeResource(offer, RESOURCE_PORTS, profile.getPorts().values(), role);
+  }
+
+  /**
+   * returns the hostname contained in the offer
+   *
+   * @return hostname
+   */
+  public String getHostName() {
+    return offer.getHostname();
+  }
+
+  public String getOfferId() {
+    return offer.getId().getValue();
+  }
+
+  public Protos.SlaveID getSlaveId() {
+    return offer.getSlaveId();
+  }
+
+  public double getScalarValue(String name) {
+    return scalarValues.get(name).getTotalValue();
+  }
+
+  public double getCpus() {
+    return getScalarValue(RESOURCE_CPUS);
+  }
+
+  public double getMem() {
+    return getScalarValue(RESOURCE_MEM);
+  }
+
+  public List<Long> getPorts() {
+    return rangeValues.get(RESOURCE_PORTS).getValues();
+  }
+
+  /**
+   * Returns true if the offer meets the profile resource needs
+   *
+   * @param profile
+   * @return
+   */
+  public boolean satisfies(ServiceResourceProfile profile) {
+    return scalarValues.containsKey(RESOURCE_CPUS) && scalarValues.get(RESOURCE_CPUS).satisfies(profile.getAggregateCpu()) &&
+        scalarValues.containsKey(RESOURCE_MEM) && scalarValues.get(RESOURCE_MEM).satisfies(profile.getAggregateMemory()) &&
+        rangeValues.containsKey(RESOURCE_PORTS) && rangeValues.get(RESOURCE_PORTS).satisfies(profile.getPorts().values());
+  }
+
+  /**
+   * Returns true if offer meets the profile resource needs AND the task constaint (an attibritute of hostname)
+   *
+   * @param profile
+   * @param constraint
+   * @return
+   */
+  public boolean satisfies(ServiceResourceProfile profile, Constraint constraint) {
+    return satisfies(profile) && meetsConstraint(constraint);
+  }
+
+  private boolean meetsConstraint(Constraint constraint) {
+    if (constraint != null) {
+      switch (constraint.getType()) {
+        case LIKE: {
+          LikeConstraint likeConstraint = (LikeConstraint) constraint;
+          if (likeConstraint.isConstraintOnHostName()) {
+            return likeConstraint.matchesHostName(offer.getHostname());
+          } else {
+            return likeConstraint.matchesSlaveAttributes(offer.getAttributesList());
+          }
+        }
+        default:
+          return false;
+      }
+    }
+    return true;
+  }
+
+  private List<Protos.Resource> consumeScalarResource(String name, Double value) {
+    Preconditions.checkState(scalarValues.containsKey(name));
+    return scalarValues.get(name).consumeResource(value);
+  }
+
+  /**
+   * Returns a list of CPU Resources meeting the requested value.
+   * Decrements the available CPU resources available in the offer.
+   * Uses Preconditions the ensure value is not more that the amount the offer has.
+   *
+   * @param value
+   * @return List<Protos.Resource>
+   */
+  public List<Protos.Resource> consumeCpus(Double value) {
+    return consumeScalarResource(RESOURCE_CPUS, value);
+  }
+
+  /**
+   * Returns a list of MEM Resources meeting the requested value.
+   * Decrements the available MEM resources available in the offer.
+   * Uses Preconditions the ensure value is not more that the amount the offer has.
+   *
+   * @param value
+   * @return List<Protos.Resource>
+   */
+  public List<Protos.Resource> consumeMem(Double value) {
+    return consumeScalarResource(RESOURCE_MEM, value);
+  }
+
+  /**
+   * Returns a list of Range Resources meeting the requestedvalues.
+   * Removes the requested values from the available range resources available in the offer.
+   * Uses Preconditions the ensure values are contained in the offer.
+   *
+   * @param requestedValues
+   * @return List<Protos.Resource>
+   */
+  public List<Protos.Resource> consumePorts(Collection<Long> requestedValues) {
+    return rangeValues.get(RESOURCE_PORTS).consumeResource(requestedValues);
+  }
+
+  private void setScalarValues() {
+    for (Protos.Resource r : offer.getResourcesList()) {
+      if (r.hasScalar() && r.hasName() && r.hasRole() && r.getRole().equals(role)) {
+        addToScalarResource(r.getName(), r.getScalar().getValue(), true);
+      } else if (r.hasName() && r.hasScalar()) {
+        addToScalarResource(r.getName(), r.getScalar().getValue(), false);
+      } else if (r.hasRanges() && r.hasName() && r.hasRole() && r.getRole().equals(role)) {
+        addToRangeResource(r.getName(), r.getRanges().getRangeList(), true);
+      } else if (r.hasRanges() && r.hasName()) {
+        addToRangeResource(r.getName(), r.getRanges().getRangeList(), false);
+      }
+    }
+  }
+
+  private void addToScalarResource(String name, Double value, Boolean hasRole) {
+    if (scalarValues.containsKey(name)) {
+      scalarValues.get(name).incrementValue(value, hasRole);
+    } else {
+      scalarValues.put(name, new ScalarResource(name, role));
+      scalarValues.get(name).incrementValue(value, hasRole);
+    }
+  }
+
+  private void addToRangeResource(String name, List<Protos.Value.Range> values , Boolean hasRole) {
+    if (rangeValues.containsKey(name)) {
+      rangeValues.get(name).addRanges(values, hasRole);
+    } else {
+      rangeValues.put(name, new RangeResource(name, role));
+      rangeValues.get(name).addRanges(values, hasRole);
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/resource/ScalarResource.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/resource/ScalarResource.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/resource/ScalarResource.java
new file mode 100644
index 0000000..a93ecee
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/resource/ScalarResource.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler.resource;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.mesos.Protos;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Mutable POJO for handling Scalar Resources
+ */
+@VisibleForTesting
+class ScalarResource {
+  double defaultValue = 0.0;
+  double roleValue = 0.0;
+  String name;
+  String role;
+
+  public ScalarResource(String name, String role) {
+    this.name = name;
+    this.role = role;
+  }
+
+  public void incrementValue(Double value, Boolean role) {
+    if (role) {
+      roleValue += value;
+    } else {
+      defaultValue += value;
+    }
+  }
+
+  public Double getTotalValue() {
+    return defaultValue + roleValue;
+  }
+
+  public Boolean satisfies(Double value) {
+    return defaultValue + roleValue >= value;
+  }
+
+  public List<Protos.Resource> consumeResource(Double value) {
+    Preconditions.checkState(roleValue + defaultValue >= value, String.format("%s value requested: %f, greater " +
+        "than amount held %f", name, value, roleValue + defaultValue));
+    List<Protos.Resource> resources = new ArrayList<>();
+    if (roleValue >= value) {
+      roleValue -= value;
+      resources.add(createResource(name, value, true));
+    } else if (roleValue + defaultValue >= value && roleValue > 0) {
+      resources.add(createResource(name, roleValue, true));
+      resources.add(createResource(name, value - roleValue, false));
+      defaultValue -= (value - roleValue);
+      roleValue = 0;
+    } else if (roleValue + defaultValue >= value) {
+      resources.add(createResource(name, value, false));
+      defaultValue -= value;
+    }
+    return resources;
+  }
+
+  private Protos.Resource createResource(String name, Double value, boolean withRole) {
+    Protos.Resource.Builder builder = Protos.Resource.newBuilder()
+        .setName(name)
+        .setScalar(Protos.Value.Scalar.newBuilder().setValue(value))
+        .setType(Protos.Value.Type.SCALAR);
+    if (withRole) {
+      builder.setRole(role);
+    }
+    return builder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java
index 9069c1a..5251f19 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java
@@ -66,7 +66,6 @@ public class MyriadFairScheduler extends FairScheduler {
     rmContext.getDispatcher().register(RMNodeEventType.class, rmNodeEventHandler);
     super.setRMContext(rmContext);
   }
-
   /**
    * ******** Methods overridden from YARN {@link FairScheduler}  *********************
    */

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/test/java/org/apache/myriad/MyriadTestModule.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/MyriadTestModule.java b/myriad-scheduler/src/test/java/org/apache/myriad/MyriadTestModule.java
index fbfb0d2..dc7a00b 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/MyriadTestModule.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/MyriadTestModule.java
@@ -26,7 +26,6 @@ import java.io.*;
 import java.util.*;
 import org.apache.myriad.configuration.*;
 import org.apache.myriad.scheduler.*;
-import org.apache.myriad.scheduler.TaskFactory.*;
 import org.slf4j.*;
 
 /**
@@ -58,7 +57,7 @@ public class MyriadTestModule extends AbstractModule {
     bind(MyriadConfiguration.class).toInstance(cfg);
 
     MapBinder<String, TaskFactory> mapBinder = MapBinder.newMapBinder(binder(), String.class, TaskFactory.class);
-    mapBinder.addBinding(NodeManagerConfiguration.NM_TASK_PREFIX).to(NMTaskFactoryImpl.class).in(Scopes.SINGLETON);
+    mapBinder.addBinding(NodeManagerConfiguration.NM_TASK_PREFIX).to(NMTaskFactory.class).in(Scopes.SINGLETON);
     Map<String, ServiceConfiguration> auxServicesConfigs = cfg.getServiceConfigurations();
     for (Map.Entry<String, ServiceConfiguration> entry : auxServicesConfigs.entrySet()) {
       String taskFactoryClass = entry.getValue().getTaskFactoryImplName().orNull();
@@ -70,7 +69,7 @@ public class MyriadTestModule extends AbstractModule {
           e.printStackTrace();
         }
       } else {
-        mapBinder.addBinding(entry.getKey()).to(ServiceTaskFactoryImpl.class).in(Scopes.SINGLETON);
+        mapBinder.addBinding(entry.getKey()).to(ServiceTaskFactory.class).in(Scopes.SINGLETON);
       }
     }
   }
@@ -78,14 +77,7 @@ public class MyriadTestModule extends AbstractModule {
   @Provides
   @Singleton
   ExecutorCommandLineGenerator providesCLIGenerator(MyriadConfiguration cfg) {
-    ExecutorCommandLineGenerator cliGenerator = null;
-    MyriadExecutorConfiguration myriadExecutorConfiguration = cfg.getMyriadExecutorConfiguration();
-    if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) {
-      cliGenerator = new DownloadNMExecutorCLGenImpl(cfg, myriadExecutorConfiguration.getNodeManagerUri().get());
-    } else {
-      cliGenerator = new NMExecutorCLGenImpl(cfg);
-    }
-    return cliGenerator;
+    return new NMExecutorCommandLineGenerator(cfg);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/test/java/org/apache/myriad/api/SchedulerStateResourceTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/api/SchedulerStateResourceTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/api/SchedulerStateResourceTest.java
index e57b128..e0eda0f 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/api/SchedulerStateResourceTest.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/api/SchedulerStateResourceTest.java
@@ -16,6 +16,8 @@ import org.apache.myriad.state.SchedulerState;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.TreeMap;
+
 /**
  * Unit tests for SchedulerStateResource
  */
@@ -34,10 +36,11 @@ public class SchedulerStateResourceTest extends BaseConfigurableTest {
     idOne = Protos.TaskID.newBuilder().setValue("nt-1").build();
     idTwo = Protos.TaskID.newBuilder().setValue("nt-2").build();
     idThree = Protos.TaskID.newBuilder().setValue("nt-3").build();
+    TreeMap<String, Long> ports = new TreeMap<>();
 
-    state.addTask(idOne, new NodeTask(new ServiceResourceProfile("profile1", 0.2, 1024.0), new LikeConstraint("localhost", "host-[0-9]*.example.com")));
-    state.addTask(idTwo, new NodeTask(new ServiceResourceProfile("profile2", 0.4, 2048.0), new LikeConstraint("localhost", "host-[0-9]*.example.com")));
-    state.addTask(idThree, new NodeTask(new ServiceResourceProfile("profile3", 0.6, 3072.0), new LikeConstraint("localhost", "host-[0-9]*.example.com")));
+    state.addTask(idOne, new NodeTask(new ServiceResourceProfile("profile1", 0.2, 1024.0, ports), new LikeConstraint("localhost", "host-[0-9]*.example.com")));
+    state.addTask(idTwo, new NodeTask(new ServiceResourceProfile("profile2", 0.4, 2048.0, ports), new LikeConstraint("localhost", "host-[0-9]*.example.com")));
+    state.addTask(idThree, new NodeTask(new ServiceResourceProfile("profile3", 0.6, 3072.0, ports), new LikeConstraint("localhost", "host-[0-9]*.example.com")));
 
     state.setFrameworkId(FrameworkID.newBuilder().setValue("mock-framework").build());
     state.makeTaskActive(idOne);

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MockSchedulerDriver.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MockSchedulerDriver.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MockSchedulerDriver.java
index 2a60e58..80c4b89 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MockSchedulerDriver.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MockSchedulerDriver.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.myriad.scheduler;
 
 import java.util.Collection;

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadDriverTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadDriverTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadDriverTest.java
index d384150..3538a34 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadDriverTest.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadDriverTest.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.myriad.scheduler;
 
 import static org.junit.Assert.assertEquals;

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java
index 78f3627..29087e7 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.myriad.scheduler;
 
 import static org.junit.Assert.assertEquals;
@@ -19,6 +36,8 @@ import org.apache.myriad.webapp.MyriadWebServer;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.TreeMap;
+
 /**
  * Unit tests for MyriadOperations class
  */
@@ -52,7 +71,8 @@ public class MyriadOperationsTest extends BaseConfigurableTest {
   }
 
   private void generateProfiles() {
-    small = new ServiceResourceProfile("small", 0.1, 512.0);
+    TreeMap<String, Long> ports = new TreeMap<>();
+    small = new ServiceResourceProfile("small", 0.1, 512.0, ports);
   }
 
   @Test 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/NMProfileManagerTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/NMProfileManagerTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/NMProfileManagerTest.java
index a0aab74..d102090 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/NMProfileManagerTest.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/NMProfileManagerTest.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.myriad.scheduler;
 
 import static org.junit.Assert.assertEquals;

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/SchedulerUtilsSpec.groovy
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/SchedulerUtilsSpec.groovy b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/SchedulerUtilsSpec.groovy
index e394dd8..6555f04 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/SchedulerUtilsSpec.groovy
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/SchedulerUtilsSpec.groovy
@@ -53,8 +53,8 @@ class SchedulerUtilsSpec extends Specification {
         given:
         def state = Mock(SchedulerState)
         def tasks = []
-        def fgsNMTask = new NodeTask(new ExtendedResourceProfile(new NMProfile("zero", 0, 0), 1.0, 2.0), null)
-        def cgsNMTask = new NodeTask(new ExtendedResourceProfile(new NMProfile("low", 2, 4096), 1.0, 2.0), null)
+        def fgsNMTask = new NodeTask(new ExtendedResourceProfile(new NMProfile("zero", 0, 0), 1.0, 2.0, new HashMap<String, Long>()), null)
+        def cgsNMTask = new NodeTask(new ExtendedResourceProfile(new NMProfile("low", 2, 4096), 1.0, 2.0, new HashMap<String, Long>()), null)
         fgsNMTask.setHostname("test_fgs_hostname")
         cgsNMTask.setHostname("test_cgs_hostname")
         tasks << fgsNMTask << cgsNMTask
@@ -82,7 +82,7 @@ class SchedulerUtilsSpec extends Specification {
 
 
     NodeTask createNodeTask(String hostname) {
-        def node = new NodeTask(new ExtendedResourceProfile(new NMProfile("", 1, 1), 1.0, 1.0), null)
+        def node = new NodeTask(new ExtendedResourceProfile(new NMProfile("", 1, 1), 1.0, 1.0, new HashMap<String, Long>()), null)
         node.hostname = hostname
         node.taskPrefix = "nm"
         node

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/ServiceResourceProfileTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/ServiceResourceProfileTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/ServiceResourceProfileTest.java
index 72e0092..7623cba 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/ServiceResourceProfileTest.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/ServiceResourceProfileTest.java
@@ -1,9 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.myriad.scheduler;
 
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.TreeMap;
+
 /**
  * Unit test cases for ServiceResourceProfile
  *
@@ -13,15 +32,14 @@ public class ServiceResourceProfileTest {
 
   @Before
   public void setUp() throws Exception {
-    profile = new ServiceResourceProfile("ServiceResourceProfile", 0.1, 1024.0, 0.1, 512.0);
+    TreeMap<String, Long> ports = new TreeMap<>();
+    profile = new ServiceResourceProfile("ServiceResourceProfile", 0.1, 1024.0, ports);
   }
 
   @Test
   public void testRequestedResources() throws Exception {
     Assert.assertEquals(new Double(0.1), profile.getCpus());
     Assert.assertEquals(new Double(1024.0), profile.getMemory());
-    Assert.assertEquals(new Double(0.1), profile.getExecutorCpu());
-    Assert.assertEquals(new Double(512.0), profile.getExecutorMemory());
   }
   
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TMSTaskFactoryImpl.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TMSTaskFactoryImpl.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TMSTaskFactoryImpl.java
index 17e043f..4ad11a5 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TMSTaskFactoryImpl.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TMSTaskFactoryImpl.java
@@ -22,15 +22,15 @@ import javax.inject.Inject;
 import org.apache.mesos.Protos.CommandInfo;
 import org.apache.mesos.Protos.ExecutorInfo;
 import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.TaskID;
 import org.apache.mesos.Protos.TaskInfo;
 import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.myriad.scheduler.resource.ResourceOfferContainer;
 
 /**
  * Test implementation of TaskFactory
  */
-public class TMSTaskFactoryImpl implements TaskFactory {
+public class TMSTaskFactoryImpl extends TaskFactory {
 
   private MyriadConfiguration cfg;
   private TaskUtils taskUtils;
@@ -42,7 +42,7 @@ public class TMSTaskFactoryImpl implements TaskFactory {
   }
 
   @Override
-  public TaskInfo createTask(Offer offer, FrameworkID frameworkId, TaskID taskId, org.apache.myriad.state.NodeTask nodeTask) {
+  public TaskInfo createTask(ResourceOfferContainer offer, FrameworkID frameworkId, TaskID taskId, org.apache.myriad.state.NodeTask nodeTask) {
     return null;
   }
 
@@ -63,7 +63,8 @@ public class TMSTaskFactoryImpl implements TaskFactory {
   }
 
   @Override
-  public ExecutorInfo getExecutorInfoForSlave(FrameworkID frameworkId, Offer offer, CommandInfo commandInfo) {
+  public ExecutorInfo getExecutorInfoForSlave(ResourceOfferContainer resourceOfferContainer, FrameworkID frameworkId,
+                                              CommandInfo commandInfo) {
     return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TaskConstraintsManagerTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TaskConstraintsManagerTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TaskConstraintsManagerTest.java
deleted file mode 100644
index 94946ce..0000000
--- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TaskConstraintsManagerTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package org.apache.myriad.scheduler;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.myriad.BaseConfigurableTest;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Unit tests for TaskConstraintsManager
- */
-public class TaskConstraintsManagerTest extends BaseConfigurableTest {
-  TaskConstraintsManager manager = new TaskConstraintsManager();
-  
-  @Before
-  public void setUp() throws Exception {
-    super.setUp();
-    manager.addTaskConstraints("jobhistory", new ServiceTaskConstraints(cfg, "jobhistory"));
-  }
-
-  @Test
-  public void testAddConstraints() throws Exception {
-    assertTrue(manager.exists("jobhistory"));
-  }
-
-  @Test
-  public void testGetConstraints() throws Exception {
-    TaskConstraints tCon = manager.getConstraints("jobhistory");
-    assertEquals(3, tCon.portsCount());
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestNMTaskFactory.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestNMTaskFactory.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestNMTaskFactory.java
new file mode 100644
index 0000000..39d6052
--- /dev/null
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestNMTaskFactory.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.myriad.scheduler;
+
+import org.apache.mesos.Protos;
+import org.apache.myriad.BaseConfigurableTest;
+import org.apache.myriad.scheduler.offer.OfferBuilder;
+import org.apache.myriad.scheduler.resource.ResourceOfferContainer;
+import org.apache.myriad.state.NodeTask;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Tests for NMTaskFactory Class
+ */
+public class TestNMTaskFactory extends BaseConfigurableTest {
+  static Protos.FrameworkID frameworkId = Protos.FrameworkID.newBuilder().setValue("test").build();
+
+  @Test
+  public void testNMTaskFactory() {
+    NMExecutorCommandLineGenerator clGenerator = new NMExecutorCommandLineGenerator(cfgWithDocker);
+    TaskUtils taskUtils = new TaskUtils(cfgWithDocker);
+    Protos.Offer offer = new OfferBuilder("test.com")
+        .addScalarResource("cpus", 10.0)
+        .addScalarResource("mem", 16000)
+        .addRangeResource("ports", 3500, 3505)
+        .build();
+    ServiceResourceProfile profile = new ExtendedResourceProfile(new NMProfile("tooMuchCpu", 7L, 8000L), taskUtils.getNodeManagerCpus(),
+        taskUtils.getNodeManagerMemory(), taskUtils.getNodeManagerPorts());
+    NodeTask nodeTask = new NodeTask(profile, null);
+    ResourceOfferContainer roc = new ResourceOfferContainer(offer, profile, null);
+    NMTaskFactory taskFactory = new NMTaskFactory(cfgWithDocker, taskUtils, clGenerator);
+    Protos.TaskInfo taskInfo = taskFactory.createTask(roc, frameworkId, makeTaskId("nm.zero"), nodeTask);
+    assertFalse("taskInfo should not have a container", taskInfo.hasContainer());
+    assertTrue("The container should have an executor", taskInfo.hasExecutor());
+    Protos.ExecutorInfo executorInfo = taskInfo.getExecutor();
+    assertTrue("executorInfo should have container", executorInfo.hasContainer());
+    Protos.ContainerInfo containerInfo = executorInfo.getContainer();
+    assertTrue("There should be two volumes", containerInfo.getVolumesCount() == 2);
+    assertTrue("The first volume should be read only", containerInfo.getVolumes(0).getMode().equals(Protos.Volume.Mode.RO));
+    assertTrue("The first volume should be read write", containerInfo.getVolumes(1).getMode().equals(Protos.Volume.Mode.RW));
+    assertTrue("There should be a docker image", containerInfo.getDocker().hasImage());
+    assertTrue("The docker image should be mesos/myraid", containerInfo.getDocker().getImage().equals("mesos/myriad"));
+    assertTrue("Should be using host networking", containerInfo.getDocker().getNetwork().equals(Protos.ContainerInfo.DockerInfo.Network.HOST));
+    assertTrue("There should be two parameters", containerInfo.getDocker().getParametersList().size() == 2);
+    assertTrue("Privledged mode should be false", !containerInfo.getDocker().getPrivileged());
+  }
+
+  private Protos.TaskID makeTaskId(String taskId) {
+    return Protos.TaskID.newBuilder().setValue(taskId).build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestRandomPorts.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestRandomPorts.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestRandomPorts.java
deleted file mode 100644
index dd36436..0000000
--- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestRandomPorts.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.myriad.scheduler;
-
-
-import com.google.common.collect.Lists;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import org.apache.mesos.Protos;
-import org.apache.mesos.Protos.Resource;
-import org.apache.mesos.Protos.Value.Range;
-import org.apache.mesos.Protos.Value.Ranges;
-import org.apache.myriad.scheduler.TaskFactory.NMTaskFactoryImpl;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Test Class to test NM ports randomization
- *
- */
-public class TestRandomPorts {
-
-  
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-  }
-
-  @Test
-  public void testRandomPorts() {
-    Range range1 = Range.newBuilder().setBegin(100).setEnd(200).build();
-    Range range2 = Range.newBuilder().setBegin(250).setEnd(300).build();
-    Range range3 = Range.newBuilder().setBegin(310).setEnd(500).build();
-    Range range4 = Range.newBuilder().setBegin(520).setEnd(720).build();
-    Range range5 = Range.newBuilder().setBegin(750).setEnd(1000).build();
-    
-    Ranges ranges = Ranges.newBuilder().addRange(range1)
-        .addRange(range2)
-        .addRange(range3)
-        .addRange(range4)
-        .addRange(range5).build();
-    
-    
-    Resource resource = Resource.newBuilder().setType(Protos.Value.Type.RANGES).setRanges(ranges).setName("ports").build();
-    
-    Set<Long> ports = NMTaskFactoryImpl.getNMPorts(resource);
-    
-    assertEquals(NMPorts.expectedNumPorts(), ports.size());
-    List<Long> sortedList = Lists.newArrayList(ports);
-    
-    Collections.sort(sortedList);
-    
-    for (Long port : sortedList) {
-      assertTrue((port >= 100 && port <= 200) ||
-          (port >= 250 && port <= 300) ||
-          (port >= 310 && port <= 500) ||
-          (port >= 520 && port <= 720) ||
-          (port >= 750 && port <= 1000));
-    }
-  }
-
-  @Test
-  public void testRandomPortsNotEnough() {
-    Range range1 = Range.newBuilder().setBegin(100).setEnd(200).build();
-    Range range2 = Range.newBuilder().setBegin(250).setEnd(300).build();
-    
-    Ranges ranges = Ranges.newBuilder().addRange(range1)
-        .addRange(range2)
-        .build();
-    
-    
-    Resource resource = Resource.newBuilder().setType(Protos.Value.Type.RANGES).setRanges(ranges).setName("ports").build();
-    
-    Set<Long> ports = NMTaskFactoryImpl.getNMPorts(resource);
-    
-    assertEquals(NMPorts.expectedNumPorts(), ports.size());
-    List<Long> sortedList = Lists.newArrayList(ports);
-    
-    Collections.sort(sortedList);
-
-    for (Long port : sortedList) {
-      assertTrue((port >= 100 && port <= 200) ||
-          (port >= 250 && port <= 300));
-    }    
-  }
-
-  @Test
-  public void testRandomPortsNotEnoughPercentKickIn() {
-    Range range1 = Range.newBuilder().setBegin(100).setEnd(200).build();
-    Range range2 = Range.newBuilder().setBegin(250).setEnd(335).build();
-    
-    Ranges ranges = Ranges.newBuilder().addRange(range1)
-        .addRange(range2)
-        .build();
-    
-    
-    Resource resource = Resource.newBuilder().setType(Protos.Value.Type.RANGES).setRanges(ranges).setName("ports").build();
-    
-    Set<Long> ports = NMTaskFactoryImpl.getNMPorts(resource);
-    
-    assertEquals(NMPorts.expectedNumPorts(), ports.size());
-    List<Long> sortedList = Lists.newArrayList(ports);
-    
-    Collections.sort(sortedList);
-
-    for (int i = 0; i < sortedList.size(); i++) {
-      assertTrue((sortedList.get(i) >= 100 && sortedList.get(i) <= 200) ||
-          (sortedList.get(i) >= 250 && sortedList.get(i) <= 335));
-    }
-  }
-  
-  @Test
-  public void testRandomPortsLargeRange() {
-    Range range1 = Range.newBuilder().setBegin(100).setEnd(500).build();
-    Range range2 = Range.newBuilder().setBegin(550).setEnd(835).build();
-    
-    Ranges ranges = Ranges.newBuilder().addRange(range1)
-        .addRange(range2)
-        .build();
-    
-    
-    Resource resource = Resource.newBuilder().setType(Protos.Value.Type.RANGES).setRanges(ranges).setName("ports").build();
-    
-    Set<Long> ports = NMTaskFactoryImpl.getNMPorts(resource);
-    
-    assertEquals(NMPorts.expectedNumPorts(), ports.size());
-    List<Long> sortedList = Lists.newArrayList(ports);
-    
-    Collections.sort(sortedList);
-
-    for (int i = 0; i < sortedList.size(); i++) {
-      assertTrue((sortedList.get(i) >= 100 && sortedList.get(i) <= 500) || 
-          (sortedList.get(i) >= 550 && sortedList.get(i) <= 835));
-    }
-  }
-
-  @Test
-  public void testRandomPortsSmallRange() {
-    Range range1 = Range.newBuilder().setBegin(100).setEnd(100).build();
-    Range range2 = Range.newBuilder().setBegin(110).setEnd(115).build();
-    
-    Ranges ranges = Ranges.newBuilder().addRange(range1)
-        .addRange(range2)
-        .build();
-    
-    Resource resource = Resource.newBuilder().setType(Protos.Value.Type.RANGES).setRanges(ranges).setName("ports").build();
-    
-    Set<Long> ports = NMTaskFactoryImpl.getNMPorts(resource);
-    
-    assertEquals(NMPorts.expectedNumPorts(), ports.size());
-    List<Long> sortedList = Lists.newArrayList(ports);
-    
-    Collections.sort(sortedList);
-
-    for (int i = 0; i < sortedList.size(); i++) {
-      assertTrue(sortedList.get(i) == 100 || (sortedList.get(i) <= 115 && sortedList.get(i) >= 110));
-    }
-  }
-  
-  @Test
-  public void notEnoughPorts() throws Exception {
-    Range range1 = Range.newBuilder().setBegin(100).setEnd(100).build();
-    Range range2 = Range.newBuilder().setBegin(110).setEnd(111).build();
-    
-    Ranges ranges = Ranges.newBuilder().addRange(range1)
-        .addRange(range2)
-        .build();
-    
-    Resource resource = Resource.newBuilder().setType(Protos.Value.Type.RANGES).setRanges(ranges).setName("ports").build();
-    
-    try {
-      NMTaskFactoryImpl.getNMPorts(resource);
-      fail("Should fail, as number of ports is not enough");
-    } catch (IllegalStateException ise) {
-      // should get here
-    }
-
-  }
-}



Mime
View raw message