myriad-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From smare...@apache.org
Subject [3/4] incubator-myriad git commit: Myriad-Issue-60 Added ability to launch JHS and other services that fit under Myriad umbrella from MyriadScheduler using default Mesos slave executor
Date Fri, 16 Oct 2015 20:39:55 GMT
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceTaskFactoryImpl.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceTaskFactoryImpl.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceTaskFactoryImpl.java
new file mode 100644
index 0000000..19c76d0
--- /dev/null
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceTaskFactoryImpl.java
@@ -0,0 +1,266 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package com.ebay.myriad.scheduler;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import javax.inject.Inject;
+
+import org.apache.mesos.Protos.CommandInfo;
+import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.FrameworkID;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.Resource;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.Value;
+import org.apache.mesos.Protos.CommandInfo.URI;
+import org.apache.mesos.Protos.Value.Scalar;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ebay.myriad.configuration.MyriadConfiguration;
+import com.ebay.myriad.configuration.MyriadExecutorConfiguration;
+import com.ebay.myriad.configuration.ServiceConfiguration;
+import com.ebay.myriad.state.NodeTask;
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Generic Service Class that allows to create a service solely base don the configuration
+ * Main properties of configuration are:
+ * 1. command to run
+ * 2. Additional env. variables to set (serviceOpts)
+ * 3. ports to use with names of the properties
+ * 4. TODO (yufeldman) executor info 
+ *
+ */
+public class ServiceTaskFactoryImpl implements TaskFactory {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ServiceTaskFactoryImpl.class);
+    
+  public static final long DEFAULT_PORT_NUMBER = 0;
+  
+  private MyriadConfiguration cfg;
+  @SuppressWarnings("unused")
+  private TaskUtils taskUtils;
+  private ServiceCommandLineGenerator clGenerator;
+
+  @Inject
+  public ServiceTaskFactoryImpl(MyriadConfiguration cfg, TaskUtils taskUtils) {
+    this.cfg = cfg;
+    this.taskUtils = taskUtils;
+    this.clGenerator = new ServiceCommandLineGenerator(cfg, cfg.getMyriadExecutorConfiguration().getNodeManagerUri().orNull());
+  }
+
+  @Override
+  public TaskInfo createTask(Offer offer, FrameworkID frameworkId,
+    TaskID taskId, NodeTask nodeTask) {
+    Objects.requireNonNull(offer, "Offer should be non-null");
+    Objects.requireNonNull(nodeTask, "NodeTask should be non-null");
+
+    ServiceConfiguration serviceConfig = 
+        cfg.getServiceConfiguration(nodeTask.getTaskPrefix());
+    
+    Objects.requireNonNull(serviceConfig, "ServiceConfig should be non-null");
+    Objects.requireNonNull(serviceConfig.getCommand().orNull(), "command for ServiceConfig should be non-null");
+    
+    final String serviceHostName = "0.0.0.0";
+    final String serviceEnv = serviceConfig.getEnvSettings();
+    final String rmHostName = System.getProperty(YARN_RESOURCEMANAGER_HOSTNAME);
+    List<Long> additionalPortsNumbers = null;
+    
+    final StringBuilder strB = new StringBuilder("env ");
+    if (serviceConfig.getServiceOpts() != null) {
+      strB.append(serviceConfig.getServiceOpts()).append("=");
+      
+      strB.append("\"");
+      if (rmHostName != null && !rmHostName.isEmpty()) {
+        strB.append("-D" + YARN_RESOURCEMANAGER_HOSTNAME + "=" + rmHostName + " ");
+      }
+      
+      Map<String, Long> ports = serviceConfig.getPorts().orNull();
+      if (ports != null && !ports.isEmpty()) {
+        int neededPortsCount = 0;
+        for (Map.Entry<String, Long> portEntry : ports.entrySet()) {
+          Long port = portEntry.getValue();
+          if (port == DEFAULT_PORT_NUMBER) {
+            neededPortsCount++;
+          }
+        }
+        // use provided ports
+        additionalPortsNumbers = getAvailablePorts(offer, neededPortsCount);
+        LOGGER.info("No specified ports found or number of specified ports is not enough. Using ports from Mesos Offers: {}", additionalPortsNumbers);
+        int index = 0;
+        for (Map.Entry<String, Long> portEntry : ports.entrySet()) {
+          String portProperty = portEntry.getKey();
+          Long port = portEntry.getValue();
+          if (port == DEFAULT_PORT_NUMBER) {
+            port = additionalPortsNumbers.get(index++);
+          }
+          strB.append("-D" + portProperty + "=" + serviceHostName + ":" + port + " ");
+        }
+      }     
+      strB.append(serviceEnv);
+      strB.append("\"");
+    }
+
+    strB.append(" ");
+    strB.append(serviceConfig.getCommand().get());
+    
+    CommandInfo commandInfo = createCommandInfo(nodeTask.getProfile(), strB.toString());
+
+    LOGGER.info("Command line for service: {} is: {}", nodeTask.getTaskPrefix(), strB.toString());
+    
+    Scalar taskMemory = Scalar.newBuilder()
+        .setValue(nodeTask.getProfile().getMemory())
+        .build();
+    Scalar taskCpus = Scalar.newBuilder()
+        .setValue(nodeTask.getProfile().getCpus())
+        .build();
+
+    TaskInfo.Builder taskBuilder = TaskInfo.newBuilder();
+    
+    taskBuilder.setName(nodeTask.getTaskPrefix())
+        .setTaskId(taskId)
+        .setSlaveId(offer.getSlaveId())
+        .addResources(
+            Resource.newBuilder().setName("cpus")
+            .setType(Value.Type.SCALAR)
+            .setScalar(taskCpus)
+            .build())
+        .addResources(
+            Resource.newBuilder().setName("mem")
+            .setType(Value.Type.SCALAR)
+            .setScalar(taskMemory)
+            .build());
+    
+    if (additionalPortsNumbers != null && !additionalPortsNumbers.isEmpty()) {
+      // set ports
+      Value.Ranges.Builder valueRanger = Value.Ranges.newBuilder();
+      for (Long port : additionalPortsNumbers) {
+        valueRanger.addRange(Value.Range.newBuilder()
+                .setBegin(port)
+                .setEnd(port));
+      }
+      
+      taskBuilder.addResources(Resource.newBuilder().setName("ports")
+          .setType(Value.Type.RANGES)
+          .setRanges(valueRanger.build()));
+    }
+    taskBuilder.setCommand(commandInfo);
+    return taskBuilder.build();
+  }
+
+  @VisibleForTesting
+  CommandInfo createCommandInfo(ServiceResourceProfile profile, String executorCmd) {
+    MyriadExecutorConfiguration myriadExecutorConfiguration = cfg.getMyriadExecutorConfiguration();
+    CommandInfo.Builder commandInfo = CommandInfo.newBuilder();
+    Map<String, String> envVars = cfg.getYarnEnvironment();
+    if (envVars != null && !envVars.isEmpty()) {
+      org.apache.mesos.Protos.Environment.Builder yarnHomeB = 
+          org.apache.mesos.Protos.Environment.newBuilder();
+      for (Map.Entry<String, String> envEntry : envVars.entrySet()) {
+        org.apache.mesos.Protos.Environment.Variable.Builder yarnEnvB = 
+            org.apache.mesos.Protos.Environment.Variable.newBuilder();
+        yarnEnvB.setName(envEntry.getKey()).setValue(envEntry.getValue());
+        yarnHomeB.addVariables(yarnEnvB.build());
+      }
+      commandInfo.mergeEnvironment(yarnHomeB.build());
+    }
+
+    if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) {
+      //Both FrameworkUser and FrameworkSuperuser to get all of the directory permissions correct.
+      if (!(cfg.getFrameworkUser().isPresent() && cfg.getFrameworkSuperUser().isPresent())) {
+        throw new RuntimeException("Trying to use remote distribution, but frameworkUser" +
+            "and/or frameworkSuperUser not set!");
+      }
+
+      LOGGER.info("Using remote distribution");
+      String clGeneratedCommand = clGenerator.generateCommandLine(profile, null);
+      
+      String nmURIString = myriadExecutorConfiguration.getNodeManagerUri().get();
+
+      //Concatenate all the subcommands
+      String cmd = clGeneratedCommand + " " + executorCmd;
+
+      //get the nodemanagerURI
+      //We're going to extract ourselves, so setExtract is false
+      LOGGER.info("Getting Hadoop distribution from:" + nmURIString);
+      URI nmUri = URI.newBuilder().setValue(nmURIString).setExtract(false)
+          .build();
+
+      //get configs directly from resource manager
+      String configUrlString = clGenerator.getConfigurationUrl();
+      LOGGER.info("Getting config from:" + configUrlString);
+      URI configUri = URI.newBuilder().setValue(configUrlString)
+          .build();
+
+      LOGGER.info("Slave will execute command:" + cmd);
+      commandInfo.addUris(nmUri).addUris(configUri).setValue("echo \"" + cmd + "\";" + cmd);
+      commandInfo.setUser(cfg.getFrameworkSuperUser().get());
+
+    } else {
+      commandInfo.setValue(executorCmd);
+    }
+    return commandInfo.build();
+  }
+
+  @Override
+  public ExecutorInfo getExecutorInfoForSlave(FrameworkID frameworkId,
+      Offer offer, CommandInfo commandInfo) {
+    // TODO (yufeldman) if executor specified use it , otherwise return null
+    // nothing to implement here, since we are using default slave executor
+    return null;
+  }
+
+  /**
+   * Helper method to reserve ports
+   * @param offer
+   * @param requestedPorts
+   * @return
+   */
+  private List<Long> getAvailablePorts(Offer offer, int requestedPorts) {
+    if (requestedPorts == 0) {
+      return null;
+    }
+    final List<Long> returnedPorts = new ArrayList<>();
+    for (Resource resource : offer.getResourcesList()){
+      if (resource.getName().equals("ports")){
+        Iterator<Value.Range> itr = resource.getRanges().getRangeList().iterator();
+        while (itr.hasNext()) {
+          Value.Range range = itr.next();
+          if (range.getBegin() <= range.getEnd()) {
+            long i = range.getBegin();
+            while (i <= range.getEnd() && returnedPorts.size() < requestedPorts) {
+              returnedPorts.add(i);
+              i++;
+            }
+            if (returnedPorts.size() >= requestedPorts) { 
+              return returnedPorts;
+            }
+          }
+        }
+      }
+    }
+    // this is actually an error condition - we did not have enough ports to use
+    return returnedPorts;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskConstraints.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskConstraints.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskConstraints.java
new file mode 100644
index 0000000..5dbd894
--- /dev/null
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskConstraints.java
@@ -0,0 +1,34 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.ebay.myriad.scheduler;
+
+/**
+ * Generic interface to represent some constraints that task can impose
+ * while figuring out whether to accept or reject the offer
+ * We may start small and then eventually add more constraints
+ */
+public interface TaskConstraints {
+
+  /**
+   * Required number of ports
+   * @return portsNumber
+   */
+  public int portsCount();
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskConstraintsManager.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskConstraintsManager.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskConstraintsManager.java
new file mode 100644
index 0000000..01aabc1
--- /dev/null
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskConstraintsManager.java
@@ -0,0 +1,31 @@
+package com.ebay.myriad.scheduler;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Factory class to keep map of the constraints
+ *
+ */
+public class TaskConstraintsManager {
+
+  /**
+   * Since all the additions will happen during init time, there is no need to make this map Concurrent
+   * if/when later on it will change we may need to change HashMap to Concurrent one
+   */
+  private Map<String, TaskConstraints> taskConstraintsMap = new HashMap<>();
+  
+  public TaskConstraints getConstraints(String taskPrefix) {
+    return taskConstraintsMap.get(taskPrefix);
+  }
+  
+  public void addTaskConstraints(final String taskPrefix, final TaskConstraints taskConstraints) {
+    if (taskConstraints != null) {
+      taskConstraintsMap.put(taskPrefix, taskConstraints);
+    }
+  }
+  
+  public boolean exists(String taskPrefix) {
+    return taskConstraintsMap.containsKey(taskPrefix);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java
index 908dca8..a1bbed4 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java
@@ -18,11 +18,18 @@
  */
 package com.ebay.myriad.scheduler;
 
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Objects;
+
+import javax.inject.Inject;
+
 import com.ebay.myriad.configuration.MyriadConfiguration;
 import com.ebay.myriad.configuration.MyriadExecutorConfiguration;
 import com.ebay.myriad.state.NodeTask;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
 import org.apache.mesos.Protos.CommandInfo;
 import org.apache.mesos.Protos.CommandInfo.URI;
 import org.apache.mesos.Protos.ExecutorID;
@@ -37,16 +44,18 @@ import org.apache.mesos.Protos.Value.Scalar;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.inject.Inject;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Objects;
 
 /**
  * Creates Tasks based on mesos offers
  */
 public interface TaskFactory {
-  TaskInfo createTask(Offer offer, FrameworkID frameworkId,
+  static final String YARN_RESOURCEMANAGER_HOSTNAME = "yarn.resourcemanager.hostname";
+  static final String YARN_RESOURCEMANAGER_WEBAPP_ADDRESS = "yarn.resourcemanager.webapp.address";
+  static final String YARN_RESOURCEMANAGER_WEBAPP_HTTPS_ADDRESS = "yarn.resourcemanager.webapp.https.address";
+  static final String YARN_HTTP_POLICY = "yarn.http.policy";
+  static final String YARN_HTTP_POLICY_HTTPS_ONLY = "HTTPS_ONLY";
+
+  TaskInfo createTask(Offer offer, FrameworkID frameworkId, 
     TaskID taskId, NodeTask nodeTask);
 
   // TODO(Santosh): This is needed because the ExecutorInfo constructed
@@ -56,7 +65,7 @@ public interface TaskFactory {
   // ExecutorInfo, we wouldn't need this interface method.
   ExecutorInfo getExecutorInfoForSlave(FrameworkID frameworkId,
     Offer offer, CommandInfo commandInfo);
-
+  
   /**
    * Creates TaskInfo objects to launch NMs as mesos tasks.
    */
@@ -64,16 +73,12 @@ public interface TaskFactory {
     public static final String EXECUTOR_NAME = "myriad_task";
     public static final String EXECUTOR_PREFIX = "myriad_executor";
     public static final String YARN_NODEMANAGER_OPTS_KEY = "YARN_NODEMANAGER_OPTS";
-    private static final String YARN_RESOURCEMANAGER_HOSTNAME = "yarn.resourcemanager.hostname";
-    private static final String YARN_RESOURCEMANAGER_WEBAPP_ADDRESS = "yarn.resourcemanager.webapp.address";
-    private static final String YARN_RESOURCEMANAGER_WEBAPP_HTTPS_ADDRESS = "yarn.resourcemanager.webapp.https.address";
-    private static final String YARN_HTTP_POLICY = "yarn.http.policy";
-    private static final String YARN_HTTP_POLICY_HTTPS_ONLY = "HTTPS_ONLY";
 
     private static final Logger LOGGER = LoggerFactory.getLogger(NMTaskFactoryImpl.class);
     private MyriadConfiguration cfg;
     private TaskUtils taskUtils;
     private ExecutorCommandLineGenerator clGenerator;
+    private TaskConstraints constraints;
 
     @Inject
     public NMTaskFactoryImpl(MyriadConfiguration cfg, TaskUtils taskUtils,
@@ -81,6 +86,7 @@ public interface TaskFactory {
       this.cfg = cfg;
       this.taskUtils = taskUtils;
       this.clGenerator = clGenerator;
+      this.constraints = new NMTaskConstraints();
     }
 
     //Utility function to get the first NMPorts.expectedNumPorts number of ports of an offer
@@ -112,25 +118,8 @@ public interface TaskFactory {
       return new NMPorts(portArray);
     }
 
-    private String getConfigurationUrl() {
-      YarnConfiguration conf = new YarnConfiguration();
-      String httpPolicy = conf.get(YARN_HTTP_POLICY);
-      if (httpPolicy != null && httpPolicy.equals(YARN_HTTP_POLICY_HTTPS_ONLY)) {
-        String address = conf.get(YARN_RESOURCEMANAGER_WEBAPP_HTTPS_ADDRESS);
-        if (address == null || address.isEmpty()) {
-          address = conf.get(YARN_RESOURCEMANAGER_HOSTNAME) + ":8090";
-        }
-        return "https://" + address + "/conf";
-      } else {
-        String address = conf.get(YARN_RESOURCEMANAGER_WEBAPP_ADDRESS);
-        if (address == null || address.isEmpty()) {
-          address = conf.get(YARN_RESOURCEMANAGER_HOSTNAME) + ":8088";
-        }
-        return "http://" + address + "/conf";
-      }
-    }
-
-    private CommandInfo getCommandInfo(NMProfile profile, NMPorts ports) {
+    @VisibleForTesting
+    CommandInfo getCommandInfo(ServiceResourceProfile profile, NMPorts ports) {
       MyriadExecutorConfiguration myriadExecutorConfiguration = cfg.getMyriadExecutorConfiguration();
       CommandInfo.Builder commandInfo = CommandInfo.newBuilder();
       String cmd;
@@ -150,7 +139,7 @@ public interface TaskFactory {
         URI nmUri = URI.newBuilder().setValue(nodeManagerUri).setExtract(false).build();
 
         //get configs directly from resource manager
-        String configUrlString = getConfigurationUrl();
+        String configUrlString = clGenerator.getConfigurationUrl();
         LOGGER.info("Getting config from:" + configUrlString);
         URI configUri = URI.newBuilder().setValue(configUrlString)
           .build();
@@ -177,15 +166,15 @@ public interface TaskFactory {
       NMPorts ports = getPorts(offer);
       LOGGER.debug(ports.toString());
 
-      NMProfile profile = nodeTask.getProfile();
+      ServiceResourceProfile serviceProfile = nodeTask.getProfile();
       Scalar taskMemory = Scalar.newBuilder()
-          .setValue(taskUtils.getTaskMemory(profile))
+          .setValue(serviceProfile.getAggregateMemory())
           .build();
       Scalar taskCpus = Scalar.newBuilder()
-          .setValue(taskUtils.getTaskCpus(profile))
+          .setValue(serviceProfile.getAggregateCpu())
           .build();
 
-      CommandInfo commandInfo = getCommandInfo(profile, ports);
+      CommandInfo commandInfo = getCommandInfo(serviceProfile, ports);
       ExecutorInfo executorInfo = getExecutorInfoForSlave(frameworkId, offer, commandInfo);
 
       TaskInfo.Builder taskBuilder = TaskInfo.newBuilder()
@@ -254,4 +243,16 @@ public interface TaskFactory {
           .setExecutorId(executorId).build();
     }
   }
+  
+  /**
+   * Implement NM Task Constraints
+   *
+   */
+  public static class NMTaskConstraints implements TaskConstraints {
+
+    @Override
+    public int portsCount() {
+      return NMPorts.expectedNumPorts();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskUtils.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskUtils.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskUtils.java
index 26ca895..3dfeb44 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskUtils.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskUtils.java
@@ -18,11 +18,14 @@
  */
 package com.ebay.myriad.scheduler;
 
+import com.ebay.myriad.configuration.ServiceConfiguration;
+import com.ebay.myriad.configuration.MyriadBadConfigurationException;
 import com.ebay.myriad.configuration.MyriadConfiguration;
 import com.ebay.myriad.configuration.MyriadExecutorConfiguration;
 import com.ebay.myriad.configuration.NodeManagerConfiguration;
 import com.ebay.myriad.executor.MyriadExecutorDefaults;
 import com.google.common.base.Optional;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.w3c.dom.Document;
@@ -47,6 +50,7 @@ import javax.xml.xpath.XPathConstants;
 import javax.xml.xpath.XPathExpression;
 import javax.xml.xpath.XPathExpressionException;
 import javax.xml.xpath.XPathFactory;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.StringWriter;
@@ -184,4 +188,32 @@ public class TaskUtils {
         return getAggregateMemory(profile) - getExecutorMemory();
     }
 
+    public double getAuxTaskCpus(NMProfile profile, String taskName) throws MyriadBadConfigurationException {
+      if (taskName.startsWith(NodeManagerConfiguration.NM_TASK_PREFIX)) {
+        return getAggregateCpus(profile);
+      }
+      ServiceConfiguration auxConf = cfg.getServiceConfiguration(taskName);
+      if (auxConf == null) {
+        throw new MyriadBadConfigurationException("Can not find profile for task name: " + taskName);
+      }
+      if (!auxConf.getCpus().isPresent()) {
+        throw new MyriadBadConfigurationException("cpu is not defined for task with name: " + taskName);
+      }
+      return auxConf.getCpus().get();
+    }
+    
+    public double getAuxTaskMemory(NMProfile profile, String taskName) throws MyriadBadConfigurationException {
+      if (taskName.startsWith(NodeManagerConfiguration.NM_TASK_PREFIX)) {
+        return getAggregateMemory(profile);
+      }
+      ServiceConfiguration auxConf = cfg.getServiceConfiguration(taskName);
+      if (auxConf == null) {
+        throw new MyriadBadConfigurationException("Can not find profile for task name: " + taskName);
+      }
+      if (!auxConf.getJvmMaxMemoryMB().isPresent()) {
+        throw new MyriadBadConfigurationException("memory is not defined for task with name: " + taskName);        
+      }
+      return auxConf.getJvmMaxMemoryMB().get();
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
index 31a029c..4b0262a 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
@@ -18,9 +18,10 @@
  */
 package com.ebay.myriad.scheduler.event.handlers;
 
-import com.ebay.myriad.scheduler.NMPorts;
-import com.ebay.myriad.scheduler.NMProfile;
 import com.ebay.myriad.scheduler.SchedulerUtils;
+import com.ebay.myriad.scheduler.ServiceResourceProfile;
+import com.ebay.myriad.scheduler.TaskConstraints;
+import com.ebay.myriad.scheduler.TaskConstraintsManager;
 import com.ebay.myriad.scheduler.TaskFactory;
 import com.ebay.myriad.scheduler.TaskUtils;
 import com.ebay.myriad.scheduler.constraints.Constraint;
@@ -32,6 +33,7 @@ import com.ebay.myriad.state.SchedulerState;
 import com.lmax.disruptor.EventHandler;
 
 import java.util.Iterator;
+
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.Offer;
@@ -65,13 +67,16 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv
   private SchedulerState schedulerState;
 
   @Inject
-  private TaskFactory taskFactory;
+  private TaskUtils taskUtils;
 
   @Inject
-  private TaskUtils taskUtils;
+  private Map<String, TaskFactory> taskFactoryMap;
 
   @Inject
   private OfferLifecycleManager offerLifecycleMgr;
+  
+  @Inject
+  private TaskConstraintsManager taskConstraintsManager;
 
   @Override
   public void onEvent(ResourceOffersEvent event, long sequence,
@@ -95,8 +100,8 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv
     try {
       for (Iterator<Offer> iterator = offers.iterator(); iterator.hasNext();) {
         Offer offer = iterator.next();
-        NodeTask nodeTask = schedulerState.getNodeTask(offer.getSlaveId());
-        if (nodeTask != null) {
+        Set<NodeTask> nodeTasks = schedulerState.getNodeTasks(offer.getSlaveId());
+        for (NodeTask nodeTask : nodeTasks) {
           nodeTask.setSlaveAttributes(offer.getAttributesList());
         }
         Set<Protos.TaskID> pendingTasks = schedulerState.getPendingTaskIds();
@@ -104,34 +109,39 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv
           for (Protos.TaskID pendingTaskId : pendingTasks) {
             NodeTask taskToLaunch = schedulerState
                 .getTask(pendingTaskId);
-            NMProfile profile = taskToLaunch.getProfile();
+            String taskPrefix = taskToLaunch.getTaskPrefix();
+            ServiceResourceProfile profile = taskToLaunch.getProfile();
             Constraint constraint = taskToLaunch.getConstraint();
 
-            if (matches(offer, profile, constraint)
-                && SchedulerUtils.isUniqueHostname(offer,
+            if (matches(offer, taskToLaunch, constraint)
+                && SchedulerUtils.isUniqueHostname(offer, taskToLaunch,
                 schedulerState.getActiveTasks())) {
-              TaskInfo task = taskFactory.createTask(offer, schedulerState.getFrameworkID(), pendingTaskId,
-                  taskToLaunch);
-
-              List<OfferID> offerIds = new ArrayList<>();
-              offerIds.add(offer.getId());
-              List<TaskInfo> tasks = new ArrayList<>();
-              tasks.add(task);
-              LOGGER.info("Launching task: {} using offer: {}", task.getTaskId().getValue(), offer.getId());
-              LOGGER.debug("Launching task: {} with profile: {} using offer: {}", task, profile, offer);
-              driver.launchTasks(offerIds, tasks);
-              schedulerState.makeTaskStaging(pendingTaskId);
-
-              // For every NM Task that we launch, we currently
-              // need to backup the ExecutorInfo for that NM Task in the State Store.
-              // Without this, we will not be able to launch tasks corresponding to yarn
-              // containers. This is specially important in case the RM restarts.
-              taskToLaunch.setExecutorInfo(task.getExecutor());
-              taskToLaunch.setHostname(offer.getHostname());
-              taskToLaunch.setSlaveId(offer.getSlaveId());
-              schedulerState.addTask(pendingTaskId, taskToLaunch);
-              iterator.remove(); // remove the used offer from offers list
-              break;
+              try {
+                final TaskInfo task = 
+                      taskFactoryMap.get(taskPrefix).createTask(offer, schedulerState.getFrameworkID(), pendingTaskId,
+                      taskToLaunch);
+                List<OfferID> offerIds = new ArrayList<>();
+                offerIds.add(offer.getId());
+                List<TaskInfo> tasks = new ArrayList<>();
+                tasks.add(task);
+                LOGGER.info("Launching task: {} using offer: {}", task.getTaskId().getValue(), offer.getId());
+                LOGGER.debug("Launching task: {} with profile: {} using offer: {}", task, profile, offer);
+                driver.launchTasks(offerIds, tasks);
+                schedulerState.makeTaskStaging(pendingTaskId);
+  
+                // For every NM Task that we launch, we currently
+                // need to backup the ExecutorInfo for that NM Task in the State Store.
+                // Without this, we will not be able to launch tasks corresponding to yarn
+                // containers. This is specially important in case the RM restarts.
+                taskToLaunch.setExecutorInfo(task.getExecutor());
+                taskToLaunch.setHostname(offer.getHostname());
+                taskToLaunch.setSlaveId(offer.getSlaveId());
+                schedulerState.addTask(pendingTaskId, taskToLaunch);
+                iterator.remove(); // remove the used offer from offers list
+                break;
+              } catch (Throwable t) {
+                LOGGER.error("Exception thrown while trying to create a task for {}", taskPrefix, t);
+              }
             }
           }
         }
@@ -156,12 +166,10 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv
     }
   }
 
-  private boolean matches(Offer offer, NMProfile profile, Constraint constraint) {
-
+  private boolean matches(Offer offer, NodeTask taskToLaunch, Constraint constraint) {
     if (!meetsConstraint(offer, constraint)) {
       return false;
     }
-
     Map<String, Object> results = new HashMap<String, Object>(5);
 
     for (Resource resource : offer.getResourcesList()) {
@@ -180,9 +188,26 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv
     checkResource(mem < 0, "mem");
     checkResource(ports < 0, "port");
 
-    return checkAggregates(profile, ports, cpus, mem);
+    return checkAggregates(offer, taskToLaunch, ports, cpus, mem);
   }
 
+    private boolean checkAggregates(Offer offer, NodeTask taskToLaunch, int ports, double cpus, double mem) {
+        final ServiceResourceProfile profile = taskToLaunch.getProfile();
+        final String taskPrefix = taskToLaunch.getTaskPrefix();
+        final double aggrCpu = profile.getAggregateCpu() + profile.getExecutorCpu();
+        final double aggrMem = profile.getAggregateMemory() + profile.getExecutorMemory();
+        final TaskConstraints taskConstraints = taskConstraintsManager.getConstraints(taskPrefix);
+        if (aggrCpu <= cpus
+            && aggrMem <= mem
+            && taskConstraints.portsCount() <= ports) {
+            return true;
+        } else {
+            LOGGER.info("Offer not sufficient for task with, cpu: {}, memory: {}, ports: {}",
+                aggrCpu, aggrMem, ports);
+            return false;
+        }
+    }
+
   private boolean meetsConstraint(Offer offer, Constraint constraint) {
     if (constraint != null) {
       switch (constraint.getType()) {
@@ -195,6 +220,8 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv
             return likeConstraint.matchesSlaveAttributes(offer.getAttributesList());
           }
         }
+      default:
+        return false;
       }
     }
     return true;
@@ -206,20 +233,6 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv
     }
   }
 
-  private boolean checkAggregates(NMProfile profile, int ports, double cpus, double mem) {
-
-    if (taskUtils.getAggregateCpus(profile) <= cpus
-        && taskUtils.getAggregateMemory(profile) <= mem
-        && NMPorts.expectedNumPorts() <= ports) {
-      return true;
-    } else {
-      LOGGER.info("Offer not sufficient for launching task. Task requires cpu: {}, memory: {}, # of ports: {}. " +
-          "Offer has cpu: {}, memory: {}, # of ports: {}", taskUtils.getAggregateCpus(profile),
-          taskUtils.getAggregateMemory(profile), NMPorts.expectedNumPorts(), cpus, mem, ports);
-      return false;
-    }
-  }
-
   private static Double scalarToDouble(Resource resource, String id) {
     Double value = new Double(0.0);
     if (resource.getType().equals(Value.Type.SCALAR)) {

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
index 9c9c22c..a987885 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
@@ -18,8 +18,10 @@
  */
 package com.ebay.myriad.scheduler.fgs;
 
+import com.ebay.myriad.configuration.NodeManagerConfiguration;
 import com.ebay.myriad.executor.ContainerTaskStatusRequest;
 import com.ebay.myriad.scheduler.MyriadDriver;
+import com.ebay.myriad.scheduler.NMTaskFactoryAnnotation;
 import com.ebay.myriad.scheduler.SchedulerUtils;
 import com.ebay.myriad.scheduler.TaskFactory;
 import com.ebay.myriad.scheduler.yarn.interceptor.BaseInterceptor;
@@ -27,11 +29,13 @@ import com.ebay.myriad.scheduler.yarn.interceptor.InterceptorRegistry;
 import com.ebay.myriad.state.SchedulerState;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
 import javax.inject.Inject;
+
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -78,7 +82,7 @@ public class YarnNodeCapacityManager extends BaseInterceptor {
                                    AbstractYarnScheduler yarnScheduler,
                                    RMContext rmContext,
                                    MyriadDriver myriadDriver,
-                                   TaskFactory taskFactory,
+                                   @NMTaskFactoryAnnotation TaskFactory taskFactory,
                                    OfferLifecycleManager offerLifecycleMgr,
                                    NodeStore nodeStore,
                                    SchedulerState state) {
@@ -233,7 +237,7 @@ public class YarnNodeCapacityManager extends BaseInterceptor {
         Protos.ExecutorInfo executorInfo = node.getExecInfo();
         if (executorInfo == null) {
             executorInfo = Protos.ExecutorInfo.newBuilder(
-                 state.getNodeTask(offer.getSlaveId()).getExecutorInfo())
+                 state.getNodeTask(offer.getSlaveId(), NodeManagerConfiguration.NM_TASK_PREFIX).getExecutorInfo())
                 .setFrameworkId(offer.getFrameworkId()).build();
             node.setExecInfo(executorInfo);
         }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java
index 1436b97..354c575 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java
@@ -18,10 +18,13 @@
  */
 package com.ebay.myriad.state;
 
-import com.ebay.myriad.scheduler.NMProfile;
 import com.ebay.myriad.scheduler.constraints.Constraint;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import java.util.List;
+import com.ebay.myriad.scheduler.ServiceResourceProfile;
+import com.ebay.myriad.scheduler.TaskUtils;
+import com.google.inject.Inject;
+
 import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.Attribute;
 
@@ -30,14 +33,18 @@ import org.apache.mesos.Protos.Attribute;
  */
 public class NodeTask {
     @JsonProperty
-    private NMProfile profile;
-    @JsonProperty
     private String hostname;
     @JsonProperty
     private Protos.SlaveID slaveId;
     @JsonProperty
     private Protos.TaskStatus taskStatus;
+    @JsonProperty
+    private String taskPrefix;
+    @JsonProperty
+    private ServiceResourceProfile serviceresourceProfile;
 
+    @Inject
+    TaskUtils taskUtils;
     /**
      * Mesos executor for this node.
      */
@@ -46,8 +53,8 @@ public class NodeTask {
     private Constraint constraint;
     private List<Attribute> slaveAttributes;
 
-    public NodeTask(NMProfile profile, Constraint constraint) {
-        this.profile = profile;
+    public NodeTask(ServiceResourceProfile profile, Constraint constraint) {
+        this.serviceresourceProfile = profile;
         this.hostname = "";
         this.constraint = constraint;
     }
@@ -60,14 +67,6 @@ public class NodeTask {
         this.slaveId = slaveId;
     }
 
-    public NMProfile getProfile() {
-        return profile;
-    }
-
-    public void setProfile(NMProfile profile) {
-        this.profile = profile;
-    }
-
     public Constraint getConstraint() {
       return constraint;
     }
@@ -103,4 +102,20 @@ public class NodeTask {
     public List<Attribute> getSlaveAttributes() {
       return slaveAttributes;
     }
+
+    public String getTaskPrefix() {
+      return taskPrefix;
+    }
+
+    public void setTaskPrefix(String taskPrefix) {
+      this.taskPrefix = taskPrefix;
+    }
+
+    public ServiceResourceProfile getProfile() {
+      return serviceresourceProfile;
+    }
+
+    public void setProfile(ServiceResourceProfile serviceresourceProfile) {
+      this.serviceresourceProfile = serviceresourceProfile;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
index ff0a33c..c3a3b7d 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
@@ -18,40 +18,45 @@
  */
 package com.ebay.myriad.state;
 
-import com.ebay.myriad.scheduler.NMProfile;
+import com.ebay.myriad.scheduler.ServiceResourceProfile;
 import com.ebay.myriad.state.utils.StoreContext;
+import com.google.common.collect.Sets;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Pattern;
+
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.SlaveID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
  * Represents the state of the Myriad scheduler
  */
 public class SchedulerState {
     private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerState.class);
 
+    private static Pattern taskIdPattern = Pattern.compile("\\.");
+    
     private Map<Protos.TaskID, NodeTask> tasks;
-    private Set<Protos.TaskID> pendingTasks;
-    private Set<Protos.TaskID> stagingTasks;
-    private Set<Protos.TaskID> activeTasks;
-    private Set<Protos.TaskID> lostTasks;
-    private Set<Protos.TaskID> killableTasks;
     private Protos.FrameworkID frameworkId;
     private MyriadStateStore stateStore;
+    private Map<String, SchedulerStateForType> statesForTaskType;
 
     public SchedulerState(MyriadStateStore stateStore) {
         this.tasks = new ConcurrentHashMap<>();
-        this.pendingTasks = new HashSet<>();
-        this.stagingTasks = new HashSet<>();
-        this.activeTasks = new HashSet<>();
-        this.lostTasks = new HashSet<>();
-        this.killableTasks = new HashSet<>();
         this.stateStore = stateStore;
+        this.statesForTaskType = new ConcurrentHashMap<>();
         loadStateStore();
     }
 
@@ -61,10 +66,12 @@ public class SchedulerState {
             return;
         }
         for (NodeTask node : nodes) {
-            Protos.TaskID taskId = Protos.TaskID.newBuilder().setValue(String.format("nm.%s.%s", node.getProfile().getName(), UUID.randomUUID()))
+            Protos.TaskID taskId = Protos.TaskID.newBuilder().setValue(String.format("%s.%s.%s", node.getTaskPrefix(), node.getProfile().getName(), UUID.randomUUID()))
                     .build();
             addTask(taskId, node);
-            LOGGER.info("Marked taskId {} pending, size of pending queue {}", taskId.getValue(), this.pendingTasks.size());
+            SchedulerStateForType taskState = this.statesForTaskType.get(node.getTaskPrefix());
+            LOGGER.info("Marked taskId {} pending, size of pending queue for {} is: {}", taskId.getValue(), node.getTaskPrefix(), 
+                (taskState == null ? 0 : taskState.getPendingTaskIds().size()));
             makeTaskPending(taskId);
         }
 
@@ -88,120 +95,156 @@ public class SchedulerState {
     public synchronized void makeTaskPending(Protos.TaskID taskId) {
         Objects.requireNonNull(taskId,
                 "taskId cannot be empty or null");
-        pendingTasks.add(taskId);
-        stagingTasks.remove(taskId);
-        activeTasks.remove(taskId);
-        lostTasks.remove(taskId);
-        killableTasks.remove(taskId);
+        String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
+        SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
+        if (taskTypeState == null) {
+          taskTypeState = new SchedulerStateForType(taskPrefix);
+          statesForTaskType.put(taskPrefix, taskTypeState);
+        }
+        taskTypeState.makeTaskPending(taskId);
         updateStateStore();
     }
 
     public synchronized void makeTaskStaging(Protos.TaskID taskId) {
         Objects.requireNonNull(taskId,
                 "taskId cannot be empty or null");
-
-        pendingTasks.remove(taskId);
-        stagingTasks.add(taskId);
-        activeTasks.remove(taskId);
-        lostTasks.remove(taskId);
-        killableTasks.remove(taskId);
+        String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
+        SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
+        if (taskTypeState == null) {
+          taskTypeState = new SchedulerStateForType(taskPrefix);
+          statesForTaskType.put(taskPrefix, taskTypeState);
+        }
+        taskTypeState.makeTaskStaging(taskId);
         updateStateStore();
     }
 
     public synchronized void makeTaskActive(Protos.TaskID taskId) {
         Objects.requireNonNull(taskId,
                 "taskId cannot be empty or null");
-
-        pendingTasks.remove(taskId);
-        stagingTasks.remove(taskId);
-        activeTasks.add(taskId);
-        lostTasks.remove(taskId);
-        killableTasks.remove(taskId);
+        String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
+        SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
+        if (taskTypeState == null) {
+          taskTypeState = new SchedulerStateForType(taskPrefix);
+          statesForTaskType.put(taskPrefix, taskTypeState);
+        }
+        taskTypeState.makeTaskActive(taskId);
         updateStateStore();
     }
 
     public synchronized void makeTaskLost(Protos.TaskID taskId) {
         Objects.requireNonNull(taskId,
                 "taskId cannot be empty or null");
-
-        pendingTasks.remove(taskId);
-        stagingTasks.remove(taskId);
-        activeTasks.remove(taskId);
-        lostTasks.add(taskId);
-        killableTasks.remove(taskId);
+        String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
+        SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
+        if (taskTypeState == null) {
+          taskTypeState = new SchedulerStateForType(taskPrefix);
+          statesForTaskType.put(taskPrefix, taskTypeState);
+        }
+        taskTypeState.makeTaskLost(taskId);
         updateStateStore();
     }
 
     public synchronized void makeTaskKillable(Protos.TaskID taskId) {
         Objects.requireNonNull(taskId,
                 "taskId cannot be empty or null");
-
-        pendingTasks.remove(taskId);
-        stagingTasks.remove(taskId);
-        activeTasks.remove(taskId);
-        lostTasks.remove(taskId);
-        killableTasks.add(taskId);
+        String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
+        SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
+        if (taskTypeState == null) {
+          taskTypeState = new SchedulerStateForType(taskPrefix);
+          statesForTaskType.put(taskPrefix, taskTypeState);
+        }
+        taskTypeState.makeTaskKillable(taskId);
         updateStateStore();
     }
 
-    public synchronized Set<Protos.TaskID> getKillableTasks() {
-        return Collections.unmodifiableSet(this.killableTasks);
-    }
-
     // TODO (sdaingade) Clone NodeTask
     public synchronized NodeTask getTask(Protos.TaskID taskId) {
         return this.tasks.get(taskId);
     }
 
+    public synchronized Set<Protos.TaskID> getKillableTasks() {
+      Set<Protos.TaskID> returnSet = new HashSet<>();
+      for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) {
+        returnSet.addAll(entry.getValue().getKillableTasks());
+      }
+      return returnSet;
+    }
+
+    public synchronized Set<Protos.TaskID> getKillableTasks(String taskPrefix) {
+      SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix);
+      return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getKillableTasks());
+    }
+    
     public synchronized void removeTask(Protos.TaskID taskId) {
-        this.pendingTasks.remove(taskId);
-        this.stagingTasks.remove(taskId);
-        this.activeTasks.remove(taskId);
-        this.lostTasks.remove(taskId);
-        this.killableTasks.remove(taskId);
-        this.tasks.remove(taskId);
-        updateStateStore();
+      String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
+      SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
+      if (taskTypeState != null) {
+        taskTypeState.removeTask(taskId);
+      }
+      this.tasks.remove(taskId);
+      updateStateStore();
     }
 
     public synchronized Set<Protos.TaskID> getPendingTaskIds() {
-        return Collections.unmodifiableSet(this.pendingTasks);
+      Set<Protos.TaskID> returnSet = new HashSet<>();
+      for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) {
+        returnSet.addAll(entry.getValue().getPendingTaskIds());
+      }
+      return returnSet;
     }
 
-    public synchronized Collection<Protos.TaskID> getPendingTaskIDsForProfile(NMProfile profile) {
+    public synchronized Collection<Protos.TaskID> getPendingTaskIDsForProfile(ServiceResourceProfile serviceProfile) {
       List<Protos.TaskID> pendingTaskIds = new ArrayList<>();
+      Set<Protos.TaskID> pendingTasks = getPendingTaskIds();
       for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
         NodeTask nodeTask = entry.getValue();
-        if (pendingTasks.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(profile.getName())) {
+        if (pendingTasks.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(serviceProfile.getName())) {
           pendingTaskIds.add(entry.getKey());
         }
       }
       return Collections.unmodifiableCollection(pendingTaskIds);
     }
 
+    public synchronized Set<Protos.TaskID> getPendingTaskIds(String taskPrefix) {
+      SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix);
+      return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getPendingTaskIds());  
+    }
+    
     public synchronized Set<Protos.TaskID> getActiveTaskIds() {
-        return Collections.unmodifiableSet(this.activeTasks);
+      Set<Protos.TaskID> returnSet = new HashSet<>();
+      for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) {
+        returnSet.addAll(entry.getValue().getActiveTaskIds());
+      }
+      return returnSet;
+    }
+    
+    public synchronized Set<Protos.TaskID> getActiveTaskIds(String taskPrefix) {
+      SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix);
+      return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getActiveTaskIds());
     }
 
     public synchronized Collection<NodeTask> getActiveTasks() {
-        List<NodeTask> activeNodeTasks = new ArrayList<>();
-        if (CollectionUtils.isNotEmpty(activeTasks)
-                && CollectionUtils.isNotEmpty(tasks.values())) {
-            for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
-                if (activeTasks.contains(entry.getKey())) {
-                    activeNodeTasks.add(entry.getValue());
-                }
-            }
+      List<NodeTask> activeNodeTasks = new ArrayList<>();
+      Set<Protos.TaskID> activeTaskIds = getActiveTaskIds();
+      if (activeTaskIds.isEmpty()) {
+        return activeNodeTasks;
+      }
+      for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
+        if (activeTaskIds.contains(entry.getKey())) {
+          activeNodeTasks.add(entry.getValue());
         }
-        return Collections.unmodifiableCollection(activeNodeTasks);
+      }
+      return activeNodeTasks;
     }
 
-    public synchronized Collection<Protos.TaskID> getActiveTaskIDsForProfile(NMProfile profile) {
+    public synchronized Collection<Protos.TaskID> getActiveTaskIDsForProfile(ServiceResourceProfile serviceProfile) {
       List<Protos.TaskID> activeTaskIDs = new ArrayList<>();
-      if (CollectionUtils.isNotEmpty(activeTasks)
+      Set<Protos.TaskID> activeTaskIds = getActiveTaskIds();
+      if (CollectionUtils.isNotEmpty(activeTaskIds)
           && CollectionUtils.isNotEmpty(tasks.values())) {
         for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
           NodeTask nodeTask = entry.getValue();
-          if (activeTasks.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(profile.getName())) {
+          if (activeTaskIds.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(serviceProfile.getName())) {
             activeTaskIDs.add(entry.getKey());
           }
         }
@@ -209,34 +252,86 @@ public class SchedulerState {
       return Collections.unmodifiableCollection(activeTaskIDs);
     }
 
-  // TODO (sdaingade) Clone NodeTask
-    public synchronized NodeTask getNodeTask(SlaveID slaveId) {
-        for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
-            if (entry.getValue().getSlaveId() != null &&
-                entry.getValue().getSlaveId().equals(slaveId)) {
-                return entry.getValue(); 
-            }
+    public Collection<NodeTask> getActiveTasksByType(String taskPrefix) {
+      List<NodeTask> activeNodeTasks = new ArrayList<>();
+      Set<Protos.TaskID> activeTaskIds = getActiveTaskIds(taskPrefix);
+      if (activeTaskIds.isEmpty()) {
+        return activeNodeTasks;
+      }
+
+      for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
+        if (activeTaskIds.contains(entry.getKey())) {
+          activeNodeTasks.add(entry.getValue());
         }
+      }
+      return activeNodeTasks;
+   }
+
+   // TODO (sdaingade) Clone NodeTask
+    public synchronized NodeTask getNodeTask(SlaveID slaveId, String taskPrefix) {
+      if (taskPrefix == null) {
         return null;
+      }
+      for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
+        final NodeTask task = entry.getValue();
+        if (task.getSlaveId() != null &&
+            task.getSlaveId().equals(slaveId) &&
+            taskPrefix.equals(task.getTaskPrefix())) {
+            return entry.getValue(); 
+        }
+      }
+      return null;
     }
 
-    public synchronized Set<Protos.TaskID> getStagingTaskIds() {
-        return Collections.unmodifiableSet(this.stagingTasks);
+    public synchronized Set<NodeTask> getNodeTasks(SlaveID slaveId) {
+      Set<NodeTask> nodeTasks = Sets.newHashSet();
+      for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
+        final NodeTask task = entry.getValue();
+        if (task.getSlaveId() != null &&
+            task.getSlaveId().equals(slaveId)) {
+          nodeTasks.add(entry.getValue()); 
+        }
+      }
+      return nodeTasks;
+    }
+
+    public Set<Protos.TaskID> getStagingTaskIds() {
+      Set<Protos.TaskID> returnSet = new HashSet<>();
+      for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) {
+        returnSet.addAll(entry.getValue().getStagingTaskIds());
+      }
+      return returnSet;
     }
 
-    public synchronized Collection<Protos.TaskID> getStagingTaskIDsForProfile(NMProfile profile) {
+    public synchronized Collection<Protos.TaskID> getStagingTaskIDsForProfile(ServiceResourceProfile serviceProfile) {
       List<Protos.TaskID> stagingTaskIDs = new ArrayList<>();
+      
+      Set<Protos.TaskID> stagingTasks = getStagingTaskIds();
       for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
         NodeTask nodeTask = entry.getValue();
-        if (stagingTasks.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(profile.getName())) {
+        if (stagingTasks.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(serviceProfile.getName())) {
           stagingTaskIDs.add(entry.getKey());
         }
       }
       return Collections.unmodifiableCollection(stagingTaskIDs);
     }
 
-  public synchronized Set<Protos.TaskID> getLostTaskIds() {
-        return Collections.unmodifiableSet(this.lostTasks);
+    public Set<Protos.TaskID> getStagingTaskIds(String taskPrefix) {
+      SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix);
+      return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getStagingTaskIds());
+    }
+    
+    public Set<Protos.TaskID> getLostTaskIds() {
+      Set<Protos.TaskID> returnSet = new HashSet<>();
+      for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) {
+        returnSet.addAll(entry.getValue().getLostTaskIds());
+      }
+      return returnSet;
+    }
+    
+    public Set<Protos.TaskID> getLostTaskIds(String taskPrefix) {
+      SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix);
+      return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getLostTaskIds());
     }
 
     // TODO (sdaingade) Currently cannot return unmodifiableCollection
@@ -274,8 +369,8 @@ public class SchedulerState {
         }
 
         try {
-            StoreContext sc = new StoreContext(frameworkId, tasks, pendingTasks,
-                stagingTasks, activeTasks, lostTasks, killableTasks);
+            StoreContext sc = new StoreContext(frameworkId, tasks, getPendingTaskIds(),
+                getStagingTaskIds(), getActiveTaskIds(), getLostTaskIds(), getKillableTasks());
             stateStore.storeMyriadState(sc);
         } catch (Exception e) {
             LOGGER.error("Failed to update scheduler state to state store", e);
@@ -293,23 +388,168 @@ public class SchedulerState {
             if (sc != null) {
                 this.frameworkId = sc.getFrameworkId();
                 this.tasks.putAll(sc.getTasks());
-                this.pendingTasks.addAll(sc.getPendingTasks());
-                this.stagingTasks.addAll(sc.getStagingTasks());
-                this.activeTasks.addAll(sc.getActiveTasks());
-                this.lostTasks.addAll(sc.getLostTasks());
-                this.killableTasks.addAll(sc.getKillableTasks());
-
+                convertToThis(TaskState.PENDING, sc.getPendingTasks());
+                convertToThis(TaskState.STAGING, sc.getStagingTasks());
+                convertToThis(TaskState.ACTIVE, sc.getActiveTasks());
+                convertToThis(TaskState.LOST, sc.getLostTasks());
+                convertToThis(TaskState.KILLABLE, sc.getKillableTasks());
                 LOGGER.info("Loaded Myriad state from state store successfully.");
                 LOGGER.debug("State Store state includes " +
                   "frameworkId: {}, pending tasks count: {}, staging tasks count: {} " +
                   "active tasks count: {}, lost tasks count: {}, " +
                   "and killable tasks count: {}", frameworkId.getValue(),
-                  this.pendingTasks.size(), this.stagingTasks.size(),
-                  this.activeTasks.size(), this.lostTasks.size(),
-                  this.killableTasks.size());
+                  this.getPendingTaskIds().size(), this.getStagingTaskIds().size(),
+                  this.getActiveTaskIds().size(), this.getLostTaskIds().size(),
+                  this.getKillableTasks().size());
             }
         }  catch (Exception e) {
             LOGGER.error("Failed to read scheduler state from state store", e);
         }
    }
+
+   private void convertToThis(TaskState taskType, Set<Protos.TaskID> taskIds) {
+     for (Protos.TaskID taskId : taskIds) {
+       String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
+       SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
+       if (taskTypeState == null) {
+         taskTypeState = new SchedulerStateForType(taskPrefix);
+         statesForTaskType.put(taskPrefix, taskTypeState);
+       }
+       switch(taskType) {
+       case PENDING:
+         taskTypeState.makeTaskPending(taskId);
+         break;
+       case STAGING:
+         taskTypeState.makeTaskStaging(taskId);
+         break;
+       case ACTIVE:
+         taskTypeState.makeTaskActive(taskId);
+         break;
+       case KILLABLE:
+         taskTypeState.makeTaskKillable(taskId);
+         break;
+       case LOST:
+         taskTypeState.makeTaskLost(taskId);
+         break;
+       }
+     }
+   }
+   /**
+    * Class to keep all the tasks states for a particular taskPrefix together
+    *
+    */
+   private static class SchedulerStateForType {
+      
+    private final String taskPrefix;
+    private Set<Protos.TaskID> pendingTasks;
+    private Set<Protos.TaskID> stagingTasks;
+    private Set<Protos.TaskID> activeTasks;
+    private Set<Protos.TaskID> lostTasks;
+    private Set<Protos.TaskID> killableTasks;
+
+    public SchedulerStateForType(String taskPrefix) {
+      this.taskPrefix = taskPrefix;
+      this.pendingTasks = new HashSet<>();
+      this.stagingTasks = new HashSet<>();
+      this.activeTasks = new HashSet<>();
+      this.lostTasks = new HashSet<>();
+      this.killableTasks = new HashSet<>();
+
+    }
+    @SuppressWarnings("unused")
+    public String getTaskPrefix() {
+      return taskPrefix;
+    }
+    
+    public synchronized void makeTaskPending(Protos.TaskID taskId) {
+      Objects.requireNonNull(taskId,
+              "taskId cannot be empty or null");
+      
+      pendingTasks.add(taskId);
+      stagingTasks.remove(taskId);
+      activeTasks.remove(taskId);
+      lostTasks.remove(taskId);
+      killableTasks.remove(taskId);
+    }
+
+    public synchronized void makeTaskStaging(Protos.TaskID taskId) {
+        Objects.requireNonNull(taskId,
+                "taskId cannot be empty or null");
+        pendingTasks.remove(taskId);
+        stagingTasks.add(taskId);
+        activeTasks.remove(taskId);
+        lostTasks.remove(taskId);
+        killableTasks.remove(taskId);
+    }
+
+    public synchronized void makeTaskActive(Protos.TaskID taskId) {
+      Objects.requireNonNull(taskId,
+              "taskId cannot be empty or null");
+      pendingTasks.remove(taskId);
+      stagingTasks.remove(taskId);
+      activeTasks.add(taskId);
+      lostTasks.remove(taskId);
+      killableTasks.remove(taskId);
+    }
+
+    public synchronized void makeTaskLost(Protos.TaskID taskId) {
+      Objects.requireNonNull(taskId,
+              "taskId cannot be empty or null");
+      pendingTasks.remove(taskId);
+      stagingTasks.remove(taskId);
+      activeTasks.remove(taskId);
+      lostTasks.add(taskId);
+      killableTasks.remove(taskId);
+    }
+
+    public synchronized void makeTaskKillable(Protos.TaskID taskId) {
+      Objects.requireNonNull(taskId,
+              "taskId cannot be empty or null");
+      pendingTasks.remove(taskId);
+      stagingTasks.remove(taskId);
+      activeTasks.remove(taskId);
+      lostTasks.remove(taskId);
+      killableTasks.add(taskId);
+    }
+    
+    public synchronized void removeTask(Protos.TaskID taskId) {
+      this.pendingTasks.remove(taskId);
+      this.stagingTasks.remove(taskId);
+      this.activeTasks.remove(taskId);
+      this.lostTasks.remove(taskId);
+      this.killableTasks.remove(taskId);
+    }
+    
+    public synchronized Set<Protos.TaskID> getPendingTaskIds() {
+      return Collections.unmodifiableSet(this.pendingTasks);
+    }
+
+    public Set<Protos.TaskID> getActiveTaskIds() {
+      return Collections.unmodifiableSet(this.activeTasks);
+    }
+
+    public synchronized Set<Protos.TaskID> getStagingTaskIds() {
+      return Collections.unmodifiableSet(this.stagingTasks);
+    }
+
+    public synchronized Set<Protos.TaskID> getLostTaskIds() {
+      return Collections.unmodifiableSet(this.lostTasks);
+    }
+
+    public synchronized Set<Protos.TaskID> getKillableTasks() {
+      return Collections.unmodifiableSet(this.killableTasks);
+    }
+
+  }
+   /**
+    * TaskState type
+    *
+    */
+   public enum TaskState {
+     PENDING,
+     STAGING,
+     ACTIVE,
+     KILLABLE,
+     LOST
+   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
index 5c8d083..4711a52 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
@@ -28,11 +28,14 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.ArrayList;
 import java.util.List;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.mesos.Protos;
-import com.ebay.myriad.scheduler.NMProfile;
+
+import com.ebay.myriad.scheduler.ServiceResourceProfile;
 import com.ebay.myriad.state.NodeTask;
 import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import com.google.protobuf.GeneratedMessage;
 
 /**
@@ -44,7 +47,10 @@ public class ByteBufferSupport {
   public static final String UTF8 = "UTF-8";
   public static final byte[] ZERO_BYTES = new byte[0];
   private static Gson gson = new Gson();
-
+  private static Gson gsonCustom = new GsonBuilder().
+      registerTypeAdapter(ServiceResourceProfile.class, new ServiceResourceProfile.CustomDeserializer()).
+      create();
+  
   public static void addByteBuffers(List<ByteBuffer> list,
     ByteArrayOutputStream bytes) throws IOException {
     // If list, add the list size, then the size of each buffer followed by the buffer.
@@ -99,9 +105,10 @@ public class ByteBufferSupport {
     return bb.array();
   }
 
+
   public static ByteBuffer toByteBuffer(NodeTask nt) {
     // Determine the size of ByteBuffer to allocate
-    // The NMProfile toString() returns Json, if this ever changes then this
+    // The ServiceResourceProfile toString() returns Json, if this ever changes then this
     // will fail. Json is expected.
     byte[] profile = toBytes(nt.getProfile().toString());
     int size = profile.length + INT_SIZE;
@@ -139,6 +146,12 @@ public class ByteBufferSupport {
         size += INT_SIZE;
     }
 
+    byte[] taskPrefixBytes = ZERO_BYTES;
+    if (nt.getTaskPrefix() != null) {
+      taskPrefixBytes = toBytes(nt.getTaskPrefix());
+      size += taskPrefixBytes.length + INT_SIZE;
+    }
+    
     // Allocate and populate the buffer.
     ByteBuffer bb = createBuffer(size);
     putBytes(bb, profile);
@@ -148,6 +161,7 @@ public class ByteBufferSupport {
     putBytes(bb, getSlaveBytes(nt));
     putBytes(bb, getTaskBytes(nt));
     putBytes(bb, getExecutorInfoBytes(nt));
+    putBytes(bb, taskPrefixBytes);
     // Make sure the buffer is at the beginning
     bb.rewind();
     return bb;
@@ -191,7 +205,7 @@ public class ByteBufferSupport {
   public static NodeTask toNodeTask(ByteBuffer bb) {
     NodeTask nt = null;
     if (bb != null && bb.array().length > 0) {
-      nt = new NodeTask(getProfile(bb), getConstraint(bb));
+      nt = new NodeTask(getServiceResourceProfile(bb), getConstraint(bb));
       nt.setHostname(toString(bb));
       nt.setSlaveId(toSlaveId(bb));
       nt.setTaskStatus(toTaskStatus(bb));
@@ -269,10 +283,10 @@ public class ByteBufferSupport {
     }
   }
 
-  public static NMProfile getProfile(ByteBuffer bb) {
+  public static ServiceResourceProfile getServiceResourceProfile(ByteBuffer bb) {
     String p = toString(bb);
     if (!StringUtils.isEmpty(p)) {
-      return gson.fromJson(p, NMProfile.class);
+      return gsonCustom.fromJson(p, ServiceResourceProfile.class);
     } else {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java
index e938ca8..364a4c3 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java
@@ -29,6 +29,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.regex.Pattern;
+
 import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.TaskID;
 
@@ -40,6 +42,7 @@ import com.ebay.myriad.state.NodeTask;
 * alternative approach.
 */
 public final class StoreContext {
+  private static Pattern taskIdPattern = Pattern.compile("\\.");
   private ByteBuffer frameworkId;
   private List<ByteBuffer> taskIds;
   private List<ByteBuffer> taskNodes;
@@ -189,8 +192,13 @@ public final class StoreContext {
       map = new HashMap<Protos.TaskID, NodeTask>(taskIds.size());
       int idx = 0;
       for (ByteBuffer bb : taskIds) {
-        map.put(ByteBufferSupport.toTaskId(bb),
-          ByteBufferSupport.toNodeTask(taskNodes.get(idx++)));
+        final Protos.TaskID taskId = ByteBufferSupport.toTaskId(bb);
+        final NodeTask task = ByteBufferSupport.toNodeTask(taskNodes.get(idx++));
+        if (task.getTaskPrefix() == null && taskId != null) {
+          String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
+          task.setTaskPrefix(taskPrefix);
+        }
+        map.put(taskId, task);
       }
     } else {
       map = new HashMap<Protos.TaskID, NodeTask>(0);

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/test/java/com/ebay/myriad/MultiBindingsTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/com/ebay/myriad/MultiBindingsTest.java b/myriad-scheduler/src/test/java/com/ebay/myriad/MultiBindingsTest.java
new file mode 100644
index 0000000..1bbd175
--- /dev/null
+++ b/myriad-scheduler/src/test/java/com/ebay/myriad/MultiBindingsTest.java
@@ -0,0 +1,73 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.ebay.myriad;
+
+import static org.junit.Assert.*;
+
+import java.util.Map;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.ebay.myriad.scheduler.TaskFactory;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+/**
+ * Test for Multibindings
+ *
+ */
+public class MultiBindingsTest {
+
+  private static Injector injector;
+  
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    MyriadTestModule myriadModule = new MyriadTestModule();
+    injector = Guice.createInjector(
+            myriadModule);
+
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+  }
+
+  @Test
+  public void multiBindingsTest() {
+    
+    
+    MultiBindingsUsage myinstance = injector.getInstance(MultiBindingsUsage.class);
+    
+    Map<String, TaskFactory> taskMap = myinstance.getMap();
+    assertNotNull(taskMap);
+    assertEquals(3, taskMap.size());
+    
+    taskMap = myinstance.getMap();
+    for (Map.Entry<String, TaskFactory> entry : taskMap.entrySet()) {
+      String keyName = entry.getKey();
+      TaskFactory taskFactory = entry.getValue();
+      System.out.println(taskFactory);
+    }
+    
+    
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/test/java/com/ebay/myriad/MultiBindingsUsage.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/com/ebay/myriad/MultiBindingsUsage.java b/myriad-scheduler/src/test/java/com/ebay/myriad/MultiBindingsUsage.java
new file mode 100644
index 0000000..aea29ed
--- /dev/null
+++ b/myriad-scheduler/src/test/java/com/ebay/myriad/MultiBindingsUsage.java
@@ -0,0 +1,39 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.ebay.myriad;
+
+import java.util.Map;
+
+import javax.inject.Inject;
+
+import com.ebay.myriad.scheduler.TaskFactory;
+
+/**
+ * Helper class to test multibindings
+ *
+ */
+public class MultiBindingsUsage {
+
+  @Inject
+  private Map<String, TaskFactory> taskFactoryMap;
+
+  public Map<String, TaskFactory> getMap() {
+    return taskFactoryMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/test/java/com/ebay/myriad/MyriadTestModule.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/com/ebay/myriad/MyriadTestModule.java b/myriad-scheduler/src/test/java/com/ebay/myriad/MyriadTestModule.java
new file mode 100644
index 0000000..a7f159f
--- /dev/null
+++ b/myriad-scheduler/src/test/java/com/ebay/myriad/MyriadTestModule.java
@@ -0,0 +1,109 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.ebay.myriad;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ebay.myriad.configuration.ServiceConfiguration;
+import com.ebay.myriad.configuration.MyriadConfiguration;
+import com.ebay.myriad.configuration.MyriadExecutorConfiguration;
+import com.ebay.myriad.configuration.NodeManagerConfiguration;
+import com.ebay.myriad.scheduler.TaskFactory.NMTaskFactoryImpl;
+import com.ebay.myriad.scheduler.DownloadNMExecutorCLGenImpl;
+import com.ebay.myriad.scheduler.ExecutorCommandLineGenerator;
+import com.ebay.myriad.scheduler.NMExecutorCLGenImpl;
+import com.ebay.myriad.scheduler.ServiceTaskFactoryImpl;
+import com.ebay.myriad.scheduler.TaskFactory;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.google.inject.Scopes;
+import com.google.inject.Singleton;
+import com.google.inject.multibindings.MapBinder;
+
+/**
+ * AbstractModule extension for UnitTests
+ *
+ */
+public class MyriadTestModule extends AbstractModule {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(MyriadTestModule.class);
+  
+  private MyriadConfiguration cfg;
+  
+  @SuppressWarnings("unchecked")
+  @Override
+  protected void configure() {
+    
+    ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+    try {
+      cfg = mapper.readValue(
+              Thread.currentThread().getContextClassLoader().getResource("myriad-config-test-default.yml"),
+              MyriadConfiguration.class);
+    } catch (IOException e1) {
+      LOGGER.error("IOException", e1);
+      return;
+    }
+
+    if (cfg == null) {
+      return;
+    }
+    
+    bind(MyriadConfiguration.class).toInstance(cfg);
+
+    MapBinder<String, TaskFactory> mapBinder
+    = MapBinder.newMapBinder(binder(), String.class, TaskFactory.class);
+    mapBinder.addBinding(NodeManagerConfiguration.NM_TASK_PREFIX).to(NMTaskFactoryImpl.class).in(Scopes.SINGLETON);
+    Map<String, ServiceConfiguration> auxServicesConfigs = cfg.getServiceConfigurations();
+    for (Map.Entry<String, ServiceConfiguration> entry : auxServicesConfigs.entrySet()) {
+      String taskFactoryClass = entry.getValue().getTaskFactoryImplName().orNull();
+      if (taskFactoryClass != null) {
+        try {
+          Class<? extends TaskFactory> implClass = (Class<? extends TaskFactory>) Class.forName(taskFactoryClass);
+          mapBinder.addBinding(entry.getKey()).to(implClass).in(Scopes.SINGLETON);
+        } catch (ClassNotFoundException e) {
+          e.printStackTrace();
+        }
+      } else {
+        mapBinder.addBinding(entry.getKey()).to(ServiceTaskFactoryImpl.class).in(Scopes.SINGLETON);
+      }
+    }
+  }
+
+  @Provides
+  @Singleton
+  ExecutorCommandLineGenerator providesCLIGenerator(MyriadConfiguration cfg) {
+      ExecutorCommandLineGenerator cliGenerator = null;
+      MyriadExecutorConfiguration myriadExecutorConfiguration =
+          cfg.getMyriadExecutorConfiguration();
+      if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) {
+          cliGenerator = new DownloadNMExecutorCLGenImpl(cfg,
+             myriadExecutorConfiguration.getNodeManagerUri().get());
+      } else {
+          cliGenerator = new NMExecutorCLGenImpl(cfg);
+      }
+      return cliGenerator;
+  }    
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/test/java/com/ebay/myriad/configuration/MyriadBadConfigurationExceptionTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/com/ebay/myriad/configuration/MyriadBadConfigurationExceptionTest.java b/myriad-scheduler/src/test/java/com/ebay/myriad/configuration/MyriadBadConfigurationExceptionTest.java
new file mode 100644
index 0000000..bae4ff2
--- /dev/null
+++ b/myriad-scheduler/src/test/java/com/ebay/myriad/configuration/MyriadBadConfigurationExceptionTest.java
@@ -0,0 +1,49 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.ebay.myriad.configuration;
+
+import static org.junit.Assert.*;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Class to test MyriadBadConfigurationException
+ *
+ */
+public class MyriadBadConfigurationExceptionTest {
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+  }
+
+  @Test
+  public void myriadExceptionTest() {
+    final String testStr = "com.ebay.myriad.configuration.MyriadBadConfigurationException: Bad configuration exception";
+    MyriadBadConfigurationException exp = new MyriadBadConfigurationException("Bad configuration exception");
+    
+    assertEquals(testStr, exp.toString());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/test/java/com/ebay/myriad/configuration/MyriadConfigurationTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/com/ebay/myriad/configuration/MyriadConfigurationTest.java b/myriad-scheduler/src/test/java/com/ebay/myriad/configuration/MyriadConfigurationTest.java
new file mode 100644
index 0000000..b7a4cd9
--- /dev/null
+++ b/myriad-scheduler/src/test/java/com/ebay/myriad/configuration/MyriadConfigurationTest.java
@@ -0,0 +1,69 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.ebay.myriad.configuration;
+
+import static org.junit.Assert.*;
+
+import java.util.Map;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+
+/**
+ * AuxServices/tasks test
+ *
+ */
+public class MyriadConfigurationTest {
+
+  static MyriadConfiguration cfg;
+  
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+    cfg = mapper.readValue(
+            Thread.currentThread().getContextClassLoader().getResource("myriad-config-test-default.yml"),
+            MyriadConfiguration.class);
+
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+  }
+
+  @Test
+  public void additionalPropertiestest() throws Exception {
+    
+    Map<String, ServiceConfiguration> auxConfigs = cfg.getServiceConfigurations();
+    
+    assertNotNull(auxConfigs);
+    assertEquals(auxConfigs.size(), 2);
+    
+    for (Map.Entry<String, ServiceConfiguration> entry : auxConfigs.entrySet()) {
+      String taskName = entry.getKey();
+      ServiceConfiguration config = entry.getValue();
+      String outTaskname = config.getTaskName();
+      assertEquals(taskName, outTaskname);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/SchedulerUtilsSpec.groovy
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/SchedulerUtilsSpec.groovy b/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/SchedulerUtilsSpec.groovy
index 305021a..efd91ff 100644
--- a/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/SchedulerUtilsSpec.groovy
+++ b/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/SchedulerUtilsSpec.groovy
@@ -16,16 +16,16 @@ class SchedulerUtilsSpec extends Specification {
         offer.getHostname() >> "hostname"
 
         expect:
-        returnValue == SchedulerUtils.isUniqueHostname(offer, tasks)
+        returnValue == SchedulerUtils.isUniqueHostname(offer, launchTask, tasks)
 
         where:
-        tasks                                              | returnValue
-        []                                                 | true
-        null                                               | true
-        createNodeTaskList("hostname")                     | false
-        createNodeTaskList("missinghost")                  | true
-        createNodeTaskList("missinghost1", "missinghost2") | true
-        createNodeTaskList("missinghost1", "hostname")     | false
+        tasks                                              | launchTask 					| returnValue
+        []                                                 | null							| true
+        null                                               | null							| true
+        createNodeTaskList("hostname")                     | createNodeTask("hostname") 	| false
+        createNodeTaskList("missinghost")                  | createNodeTask("hostname") 	| true
+        createNodeTaskList("missinghost1", "missinghost2") | createNodeTask("missinghost3")	| true
+        createNodeTaskList("missinghost1", "hostname")     | createNodeTask("hostname")		| false
 
     }
 
@@ -39,8 +39,9 @@ class SchedulerUtilsSpec extends Specification {
 
 
     NodeTask createNodeTask(String hostname) {
-        def node = new NodeTask(new NMProfile("", 1, 1), null)
+        def node = new NodeTask(new ExtendedResourceProfile(new NMProfile("", 1, 1), 1.0,1.0), null)
         node.hostname = hostname
+        node.taskPrefix = "nm"
         node
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/TMSTaskFactoryImpl.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/TMSTaskFactoryImpl.java b/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/TMSTaskFactoryImpl.java
new file mode 100644
index 0000000..e7b77f1
--- /dev/null
+++ b/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/TMSTaskFactoryImpl.java
@@ -0,0 +1,75 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.ebay.myriad.scheduler;
+
+import javax.inject.Inject;
+
+import org.apache.mesos.Protos.CommandInfo;
+import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.FrameworkID;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskInfo;
+
+import com.ebay.myriad.configuration.MyriadConfiguration;
+import com.ebay.myriad.state.NodeTask;
+
+/**
+ * Test implementation of TaskFactory
+ *
+ */
+public class TMSTaskFactoryImpl implements TaskFactory {
+
+  private MyriadConfiguration cfg;
+  private TaskUtils taskUtils;
+
+  @Inject
+  public TMSTaskFactoryImpl(MyriadConfiguration cfg, TaskUtils taskUtils) {
+      this.setCfg(cfg);
+      this.setTaskUtils(taskUtils);
+  }
+
+  @Override
+  public TaskInfo createTask(Offer offer, FrameworkID frameworkId, 
+      TaskID taskId, NodeTask nodeTask) {
+    return null;
+  }
+
+  public MyriadConfiguration getCfg() {
+    return cfg;
+  }
+
+  public void setCfg(MyriadConfiguration cfg) {
+    this.cfg = cfg;
+  }
+
+  public TaskUtils getTaskUtils() {
+    return taskUtils;
+  }
+
+  public void setTaskUtils(TaskUtils taskUtils) {
+    this.taskUtils = taskUtils;
+  }
+
+  @Override
+  public ExecutorInfo getExecutorInfoForSlave(FrameworkID frameworkId,
+      Offer offer, CommandInfo commandInfo) {
+    return null;
+  }
+}



Mime
View raw message