myriad-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From smare...@apache.org
Subject [18/20] incubator-myriad git commit: com.ebay => org.apache
Date Wed, 28 Oct 2015 16:07:50 GMT
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExtendedResourceProfile.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExtendedResourceProfile.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExtendedResourceProfile.java
deleted file mode 100644
index 9e2c063..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExtendedResourceProfile.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler;
-
-import com.google.gson.Gson;
-
-/**
- * Extended ServiceResourceProfile for services that need to pass set of resources downstream
- * currently the only such service is NodeManager
- */
-public class ExtendedResourceProfile extends ServiceResourceProfile {
-
-  private NMProfile childProfile;
-
-  /**
-   * @param childProfile - should be null
-   * @param cpu
-   * @param mem          will throw NullPoiterException if childProfile is null
-   */
-  public ExtendedResourceProfile(NMProfile childProfile, Double cpu, Double mem) {
-    super(childProfile.getName(), cpu, mem);
-    this.childProfile = childProfile;
-    this.className = ExtendedResourceProfile.class.getName();
-  }
-
-  public NMProfile getChildProfile() {
-    return childProfile;
-  }
-
-  public void setChildProfile(NMProfile nmProfile) {
-    this.childProfile = nmProfile;
-  }
-
-  @Override
-  public String getName() {
-    return childProfile.getName();
-  }
-
-  @Override
-  public Double getCpus() {
-    return childProfile.getCpus().doubleValue();
-  }
-
-  @Override
-  public Double getMemory() {
-    return childProfile.getMemory().doubleValue();
-  }
-
-  @Override
-  public Double getAggregateMemory() {
-    return memory + childProfile.getMemory();
-  }
-
-  @Override
-  public Double getAggregateCpu() {
-    return cpus + childProfile.getCpus();
-  }
-
-  @Override
-  public String toString() {
-    Gson gson = new Gson();
-    return gson.toJson(this);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadDriver.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadDriver.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadDriver.java
deleted file mode 100644
index 83c4cd2..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadDriver.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler;
-
-import javax.inject.Inject;
-
-import org.apache.mesos.Protos.Status;
-import org.apache.mesos.Protos.TaskID;
-import org.apache.mesos.SchedulerDriver;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Driver for Myriad scheduler.
- */
-public class MyriadDriver {
-  private static final Logger LOGGER = LoggerFactory.getLogger(MyriadDriver.class);
-
-  private final SchedulerDriver driver;
-
-  @Inject
-  public MyriadDriver(SchedulerDriver driver) {
-    this.driver = driver;
-  }
-
-  public Status start() {
-    LOGGER.info("Starting driver");
-    Status status = driver.start();
-    LOGGER.info("Driver started with status: {}", status);
-    return status;
-  }
-
-  public Status kill(final TaskID taskId) {
-    LOGGER.info("Killing task {}", taskId);
-    Status status = driver.killTask(taskId);
-    LOGGER.info("Task {} killed with status: {}", taskId, status);
-    return status;
-  }
-
-  public Status abort() {
-    LOGGER.info("Aborting driver");
-    Status status = driver.abort();
-    LOGGER.info("Driver aborted with status: {}", status);
-    return status;
-  }
-
-  public SchedulerDriver getDriver() {
-    return driver;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadDriverManager.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadDriverManager.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadDriverManager.java
deleted file mode 100644
index 4d669f4..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadDriverManager.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler;
-
-import com.google.common.base.Preconditions;
-import org.apache.mesos.Protos;
-import org.apache.mesos.Protos.Status;
-import org.apache.mesos.Protos.TaskID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.inject.Inject;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * Manager for the myriad scheduler driver
- */
-public class MyriadDriverManager {
-  private static final Logger LOGGER = LoggerFactory.getLogger(MyriadDriverManager.class);
-  private final Lock driverLock;
-  private MyriadDriver driver;
-  private Status driverStatus;
-
-  @Inject
-  public MyriadDriverManager(MyriadDriver driver) {
-    this.driver = driver;
-    this.driverLock = new ReentrantLock();
-    this.driverStatus = Protos.Status.DRIVER_NOT_STARTED;
-  }
-
-  public Status startDriver() {
-    this.driverLock.lock();
-    try {
-      Preconditions.checkState(this.isStartable());
-      LOGGER.info("Starting driver...");
-      this.driverStatus = driver.start();
-      LOGGER.info("Driver started with status: {}", this.driverStatus);
-    } finally {
-      this.driverLock.unlock();
-    }
-    return this.driverStatus;
-  }
-
-  public Status stopDriver() {
-    this.driverLock.lock();
-    try {
-      if (isRunning()) {
-        LOGGER.info("Aborting driver...");
-        this.driverStatus = this.driver.abort();
-        LOGGER.info("Aborted driver with status: {}", this.driverStatus);
-      }
-    } finally {
-      this.driverLock.unlock();
-    }
-    return driverStatus;
-  }
-
-  public Status kill(final TaskID taskId) {
-    LOGGER.info("Killing task {}", taskId);
-    this.driverLock.lock();
-    try {
-      if (isRunning()) {
-        this.driverStatus = driver.kill(taskId);
-        LOGGER.info("Task {} killed with status: {}", taskId, this.driverStatus);
-      } else {
-        LOGGER.warn("Cannot kill task, driver is not running");
-      }
-    } finally {
-      this.driverLock.unlock();
-    }
-
-    return driverStatus;
-  }
-
-  public Status getDriverStatus() {
-    return this.driverStatus;
-  }
-
-  private boolean isStartable() {
-    return this.driver != null && this.driverStatus == Status.DRIVER_NOT_STARTED;
-  }
-
-  private boolean isRunning() {
-    return this.driver != null && this.driverStatus == Status.DRIVER_RUNNING;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
deleted file mode 100644
index 25369df..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler;
-
-import com.ebay.myriad.configuration.MyriadBadConfigurationException;
-import com.ebay.myriad.configuration.ServiceConfiguration;
-import com.ebay.myriad.configuration.MyriadConfiguration;
-import com.ebay.myriad.configuration.NodeManagerConfiguration;
-import com.ebay.myriad.policy.NodeScaleDownPolicy;
-import com.ebay.myriad.scheduler.constraints.Constraint;
-import com.ebay.myriad.scheduler.constraints.LikeConstraint;
-import com.ebay.myriad.state.NodeTask;
-import com.ebay.myriad.state.SchedulerState;
-import com.google.common.collect.Lists;
-import com.google.inject.Inject;
-
-import org.apache.mesos.Protos;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Myriad scheduler operations
- */
-public class MyriadOperations {
-  private static final Logger LOGGER = LoggerFactory.getLogger(MyriadOperations.class);
-  private final SchedulerState schedulerState;
-
-  private MyriadConfiguration cfg;
-  private NodeScaleDownPolicy nodeScaleDownPolicy;
-
-  @Inject
-  public MyriadOperations(MyriadConfiguration cfg, SchedulerState schedulerState, NodeScaleDownPolicy nodeScaleDownPolicy) {
-    this.cfg = cfg;
-    this.schedulerState = schedulerState;
-    this.nodeScaleDownPolicy = nodeScaleDownPolicy;
-  }
-
-  public void flexUpCluster(ServiceResourceProfile serviceResourceProfile, int instances, Constraint constraint) {
-    Collection<NodeTask> nodes = new HashSet<>();
-    for (int i = 0; i < instances; i++) {
-      NodeTask nodeTask = new NodeTask(serviceResourceProfile, constraint);
-      nodeTask.setTaskPrefix(NodeManagerConfiguration.NM_TASK_PREFIX);
-      nodes.add(nodeTask);
-    }
-
-    LOGGER.info("Adding {} NM instances to cluster", nodes.size());
-    this.schedulerState.addNodes(nodes);
-  }
-
-  public void flexDownCluster(ServiceResourceProfile serviceResourceProfile, Constraint constraint, int numInstancesToScaleDown) {
-    // Flex down Pending tasks, if any
-    int numPendingTasksScaledDown = flexDownPendingTasks(serviceResourceProfile, constraint, numInstancesToScaleDown);
-
-    // Flex down Staging tasks, if any
-    int numStagingTasksScaledDown = flexDownStagingTasks(serviceResourceProfile, constraint, numInstancesToScaleDown - numPendingTasksScaledDown);
-
-    // Flex down Active tasks, if any
-    int numActiveTasksScaledDown = flexDownActiveTasks(serviceResourceProfile, constraint, numInstancesToScaleDown - numPendingTasksScaledDown - numStagingTasksScaledDown);
-
-    if (numActiveTasksScaledDown + numStagingTasksScaledDown + numPendingTasksScaledDown == 0) {
-      LOGGER.info("No Node Managers with profile '{}' and constraint '{}' found for scaling down.", serviceResourceProfile.getName(), constraint == null ? "null" : constraint.toString());
-    } else {
-      LOGGER.info("Flexed down {} active, {} staging  and {} pending Node Managers with " + "'{}' profile and constraint '{}'.", numActiveTasksScaledDown, numStagingTasksScaledDown, numPendingTasksScaledDown, serviceResourceProfile.getName(),
-          constraint == null ? "null" : constraint.toString());
-    }
-  }
-
-  /**
-   * Flexup a service
-   *
-   * @param instances
-   * @param serviceName
-   */
-  public void flexUpAService(int instances, String serviceName) throws MyriadBadConfigurationException {
-    final ServiceConfiguration auxTaskConf = cfg.getServiceConfiguration(serviceName);
-
-    int totalflexInstances = instances + getFlexibleInstances(serviceName);
-    Integer maxInstances = auxTaskConf.getMaxInstances().orNull();
-    if (maxInstances != null && maxInstances > 0) {
-      // check number of instances
-      // sum of active, staging, pending should be < maxInstances
-      if (totalflexInstances > maxInstances) {
-        LOGGER.error("Current number of active, staging, pending and requested instances: {}" + ", while it is greater then max instances allowed: {}", totalflexInstances, maxInstances);
-        throw new MyriadBadConfigurationException("Current number of active, staging, pending instances and requested: " + totalflexInstances + ", while it is greater then max instances allowed: " + maxInstances);
-      }
-    }
-
-    final Double cpu = auxTaskConf.getCpus().or(ServiceConfiguration.DEFAULT_CPU);
-    final Double mem = auxTaskConf.getJvmMaxMemoryMB().or(ServiceConfiguration.DEFAULT_MEMORY);
-
-    Collection<NodeTask> nodes = new HashSet<>();
-    for (int i = 0; i < instances; i++) {
-      NodeTask nodeTask = new NodeTask(new ServiceResourceProfile(serviceName, cpu, mem), null);
-      nodeTask.setTaskPrefix(serviceName);
-      nodes.add(nodeTask);
-    }
-
-    LOGGER.info("Adding {} {} instances to cluster", nodes.size(), serviceName);
-    this.schedulerState.addNodes(nodes);
-  }
-
-  /**
-   * Flexing down any service defined in the configuration
-   *
-   * @param numInstancesToScaleDown
-   * @param serviceName             - name of the service
-   */
-  public void flexDownAService(int numInstancesToScaleDown, String serviceName) {
-    LOGGER.info("About to flex down {} instances of {}", numInstancesToScaleDown, serviceName);
-
-    int numScaledDown = 0;
-
-    // Flex down Pending tasks, if any
-    if (numScaledDown < numInstancesToScaleDown) {
-      Collection<Protos.TaskID> pendingTasks = this.schedulerState.getPendingTaskIds(serviceName);
-
-      for (Protos.TaskID taskId : pendingTasks) {
-        this.schedulerState.makeTaskKillable(taskId);
-        numScaledDown++;
-        if (numScaledDown >= numInstancesToScaleDown) {
-          break;
-        }
-      }
-    }
-    int numPendingTasksScaledDown = numScaledDown;
-
-    // Flex down Staging tasks, if any
-    if (numScaledDown < numInstancesToScaleDown) {
-      Collection<Protos.TaskID> stagingTasks = this.schedulerState.getStagingTaskIds(serviceName);
-
-      for (Protos.TaskID taskId : stagingTasks) {
-        this.schedulerState.makeTaskKillable(taskId);
-        numScaledDown++;
-        if (numScaledDown >= numInstancesToScaleDown) {
-          break;
-        }
-      }
-    }
-    int numStagingTasksScaledDown = numScaledDown - numPendingTasksScaledDown;
-
-    Set<NodeTask> activeTasks = this.schedulerState.getActiveTasksByType(serviceName);
-    if (numScaledDown < numInstancesToScaleDown) {
-      for (NodeTask nodeTask : activeTasks) {
-        this.schedulerState.makeTaskKillable(nodeTask.getTaskStatus().getTaskId());
-        numScaledDown++;
-        if (LOGGER.isDebugEnabled()) {
-          LOGGER.debug("Marked NodeTask {} on host {} for kill.", nodeTask.getTaskStatus().getTaskId(), nodeTask.getHostname());
-        }
-        if (numScaledDown >= numInstancesToScaleDown) {
-          break;
-        }
-      }
-    }
-
-    LOGGER.info("Flexed down {} of {} instances including {} staging instances, and {} pending instances of {}", numScaledDown, numInstancesToScaleDown, numStagingTasksScaledDown, numPendingTasksScaledDown, serviceName);
-  }
-
-  private int flexDownPendingTasks(ServiceResourceProfile profile, Constraint constraint, int numInstancesToScaleDown) {
-    return numInstancesToScaleDown > 0 ? flexDownTasks(schedulerState.getPendingTaskIDsForProfile(profile), profile, constraint, numInstancesToScaleDown) : 0;
-  }
-
-  private int flexDownStagingTasks(ServiceResourceProfile profile, Constraint constraint, int numInstancesToScaleDown) {
-    return numInstancesToScaleDown > 0 ? flexDownTasks(schedulerState.getStagingTaskIDsForProfile(profile), profile, constraint, numInstancesToScaleDown) : 0;
-  }
-
-  private int flexDownActiveTasks(ServiceResourceProfile profile, Constraint constraint, int numInstancesToScaleDown) {
-    if (numInstancesToScaleDown > 0) {
-      List<Protos.TaskID> activeTasksForProfile = Lists.newArrayList(schedulerState.getActiveTaskIDsForProfile(profile));
-      nodeScaleDownPolicy.apply(activeTasksForProfile);
-      return flexDownTasks(activeTasksForProfile, profile, constraint, numInstancesToScaleDown);
-    }
-    return 0;
-  }
-
-  private int flexDownTasks(Collection<Protos.TaskID> taskIDs, ServiceResourceProfile profile, Constraint constraint, int numInstancesToScaleDown) {
-    int numInstancesScaledDown = 0;
-    for (Protos.TaskID taskID : taskIDs) {
-      NodeTask nodeTask = schedulerState.getTask(taskID);
-      if (nodeTask.getProfile().getName().equals(profile.getName()) && meetsConstraint(nodeTask, constraint)) {
-        this.schedulerState.makeTaskKillable(taskID);
-        numInstancesScaledDown++;
-        if (numInstancesScaledDown == numInstancesToScaleDown) {
-          break;
-        }
-      }
-    }
-    return numInstancesScaledDown;
-  }
-
-  private boolean meetsConstraint(NodeTask nodeTask, Constraint constraint) {
-    if (constraint != null) {
-      if (constraint.equals(nodeTask.getConstraint())) {
-        return true;
-      }
-      switch (constraint.getType()) {
-        case LIKE:
-          LikeConstraint likeConstraint = (LikeConstraint) constraint;
-          if (likeConstraint.isConstraintOnHostName()) {
-            return likeConstraint.matchesHostName(nodeTask.getHostname());
-          } else {
-            return likeConstraint.matchesSlaveAttributes(nodeTask.getSlaveAttributes());
-          }
-
-        default:
-          return false;
-      }
-    }
-    return true;
-  }
-
-  public Integer getFlexibleInstances(String taskPrefix) {
-    return this.schedulerState.getActiveTaskIds(taskPrefix).size() + this.schedulerState.getStagingTaskIds(taskPrefix).size() + this.schedulerState.getPendingTaskIds(taskPrefix).size();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadScheduler.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadScheduler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadScheduler.java
deleted file mode 100644
index db44648..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadScheduler.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler;
-
-import com.ebay.myriad.DisruptorManager;
-import com.ebay.myriad.configuration.MyriadConfiguration;
-import com.ebay.myriad.scheduler.event.DisconnectedEvent;
-import com.ebay.myriad.scheduler.event.ErrorEvent;
-import com.ebay.myriad.scheduler.event.ExecutorLostEvent;
-import com.ebay.myriad.scheduler.event.FrameworkMessageEvent;
-import com.ebay.myriad.scheduler.event.OfferRescindedEvent;
-import com.ebay.myriad.scheduler.event.ReRegisteredEvent;
-import com.ebay.myriad.scheduler.event.RegisteredEvent;
-import com.ebay.myriad.scheduler.event.ResourceOffersEvent;
-import com.ebay.myriad.scheduler.event.SlaveLostEvent;
-import com.ebay.myriad.scheduler.event.StatusUpdateEvent;
-import com.lmax.disruptor.EventTranslator;
-import org.apache.mesos.Protos;
-import org.apache.mesos.Scheduler;
-import org.apache.mesos.SchedulerDriver;
-
-import javax.inject.Inject;
-import java.util.List;
-
-/**
- * Myriad Scheduler
- */
-public class MyriadScheduler implements Scheduler {
-  private DisruptorManager disruptorManager;
-
-  @Inject
-  public MyriadScheduler(final MyriadConfiguration cfg, final DisruptorManager disruptorManager) {
-    this.disruptorManager = disruptorManager;
-  }
-
-  @Override
-  public void registered(final SchedulerDriver driver, final Protos.FrameworkID frameworkId, final Protos.MasterInfo masterInfo) {
-    disruptorManager.getRegisteredEventDisruptor().publishEvent(new EventTranslator<RegisteredEvent>() {
-      @Override
-      public void translateTo(RegisteredEvent event, long sequence) {
-        event.setDriver(driver);
-        event.setFrameworkId(frameworkId);
-        event.setMasterInfo(masterInfo);
-      }
-    });
-  }
-
-  @Override
-  public void reregistered(final SchedulerDriver driver, final Protos.MasterInfo masterInfo) {
-    disruptorManager.getReRegisteredEventDisruptor().publishEvent(new EventTranslator<ReRegisteredEvent>() {
-      @Override
-      public void translateTo(ReRegisteredEvent event, long sequence) {
-        event.setDriver(driver);
-        event.setMasterInfo(masterInfo);
-      }
-    });
-  }
-
-  @Override
-  public void resourceOffers(final SchedulerDriver driver, final List<Protos.Offer> offers) {
-    disruptorManager.getResourceOffersEventDisruptor().publishEvent(new EventTranslator<ResourceOffersEvent>() {
-      @Override
-      public void translateTo(ResourceOffersEvent event, long sequence) {
-        event.setDriver(driver);
-        event.setOffers(offers);
-      }
-    });
-  }
-
-  @Override
-  public void offerRescinded(final SchedulerDriver driver, final Protos.OfferID offerId) {
-    disruptorManager.getOfferRescindedEventDisruptor().publishEvent(new EventTranslator<OfferRescindedEvent>() {
-      @Override
-      public void translateTo(OfferRescindedEvent event, long sequence) {
-        event.setDriver(driver);
-        event.setOfferId(offerId);
-      }
-    });
-  }
-
-  @Override
-  public void statusUpdate(final SchedulerDriver driver, final Protos.TaskStatus status) {
-    disruptorManager.getStatusUpdateEventDisruptor().publishEvent(new EventTranslator<StatusUpdateEvent>() {
-      @Override
-      public void translateTo(StatusUpdateEvent event, long sequence) {
-        event.setDriver(driver);
-        event.setStatus(status);
-      }
-    });
-  }
-
-  @Override
-  public void frameworkMessage(final SchedulerDriver driver, final Protos.ExecutorID executorId, final Protos.SlaveID slaveId, final byte[] bytes) {
-    disruptorManager.getFrameworkMessageEventDisruptor().publishEvent(new EventTranslator<FrameworkMessageEvent>() {
-      @Override
-      public void translateTo(FrameworkMessageEvent event, long sequence) {
-        event.setDriver(driver);
-        event.setBytes(bytes);
-        event.setExecutorId(executorId);
-        event.setSlaveId(slaveId);
-      }
-    });
-  }
-
-  @Override
-  public void disconnected(final SchedulerDriver driver) {
-    disruptorManager.getDisconnectedEventDisruptor().publishEvent(new EventTranslator<DisconnectedEvent>() {
-      @Override
-      public void translateTo(DisconnectedEvent event, long sequence) {
-        event.setDriver(driver);
-      }
-    });
-  }
-
-  @Override
-  public void slaveLost(final SchedulerDriver driver, final Protos.SlaveID slaveId) {
-    disruptorManager.getSlaveLostEventDisruptor().publishEvent(new EventTranslator<SlaveLostEvent>() {
-      @Override
-      public void translateTo(SlaveLostEvent event, long sequence) {
-        event.setDriver(driver);
-        event.setSlaveId(slaveId);
-      }
-    });
-  }
-
-  @Override
-  public void executorLost(final SchedulerDriver driver, final Protos.ExecutorID executorId, final Protos.SlaveID slaveId, final int exitStatus) {
-    disruptorManager.getExecutorLostEventDisruptor().publishEvent(new EventTranslator<ExecutorLostEvent>() {
-      @Override
-      public void translateTo(ExecutorLostEvent event, long sequence) {
-        event.setDriver(driver);
-        event.setExecutorId(executorId);
-        event.setSlaveId(slaveId);
-        event.setExitStatus(exitStatus);
-      }
-    });
-  }
-
-  @Override
-  public void error(final SchedulerDriver driver, final String message) {
-    disruptorManager.getErrorEventDisruptor().publishEvent(new EventTranslator<ErrorEvent>() {
-      @Override
-      public void translateTo(ErrorEvent event, long sequence) {
-        event.setDriver(driver);
-        event.setMessage(message);
-      }
-    });
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMExecutorCLGenImpl.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMExecutorCLGenImpl.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMExecutorCLGenImpl.java
deleted file mode 100644
index 55bdf79..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMExecutorCLGenImpl.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.ebay.myriad.scheduler;
-
-import java.util.Map;
-import java.util.HashMap;
-
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.ebay.myriad.configuration.MyriadConfiguration;
-
-/**
- * Implementation assumes NM binaries already deployed
- */
-public class NMExecutorCLGenImpl implements ExecutorCommandLineGenerator {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(NMExecutorCLGenImpl.class);
-
-  public static final String ENV_YARN_NODEMANAGER_OPTS = "YARN_NODEMANAGER_OPTS";
-  public static final String KEY_YARN_NM_CGROUPS_PATH = "yarn.nodemanager.cgroups.path";
-  public static final String KEY_YARN_RM_HOSTNAME = "yarn.resourcemanager.hostname";
-
-  /**
-   * YARN container executor class.
-   */
-  public static final String KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS = "yarn.nodemanager.container-executor.class";
-  // TODO (mohit): Should it be configurable ?
-  public static final String VAL_YARN_NM_CONTAINER_EXECUTOR_CLASS = "org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor";
-  public static final String DEFAULT_YARN_NM_CONTAINER_EXECUTOR_CLASS = "org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor";
-
-  /**
-   * YARN class to help handle LCE resources
-   */
-  public static final String KEY_YARN_NM_LCE_RH_CLASS = "yarn.nodemanager.linux-container-executor.resources-handler.class";
-
-  // TODO (mohit): Should it be configurable ?
-  public static final String VAL_YARN_NM_LCE_RH_CLASS = "org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler";
-  public static final String KEY_YARN_NM_LCE_CGROUPS_HIERARCHY = "yarn.nodemanager.linux-container-executor.cgroups.hierarchy";
-  public static final String VAL_YARN_NM_LCE_CGROUPS_HIERARCHY = "mesos/$TASK_DIR";
-  public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT = "yarn.nodemanager.linux-container-executor.cgroups.mount";
-  public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT_PATH = "yarn.nodemanager.linux-container-executor.cgroups.mount-path";
-  public static final String VAL_YARN_NM_LCE_CGROUPS_MOUNT_PATH = "/sys/fs/cgroup";
-  public static final String KEY_YARN_NM_LCE_GROUP = "yarn.nodemanager.linux-container-executor.group";
-  public static final String KEY_YARN_NM_LCE_PATH = "yarn.nodemanager.linux-container-executor.path";
-  public static final String KEY_YARN_HOME = "yarn.home";
-  public static final String KEY_NM_RESOURCE_CPU_VCORES = "nodemanager.resource.cpu-vcores";
-  public static final String KEY_NM_RESOURCE_MEM_MB = "nodemanager.resource.memory-mb";
-  public static final String YARN_NM_CMD = " $YARN_HOME/bin/yarn nodemanager";
-  public static final String KEY_NM_ADDRESS = "myriad.yarn.nodemanager.address";
-  public static final String KEY_NM_LOCALIZER_ADDRESS = "myriad.yarn.nodemanager.localizer.address";
-  public static final String KEY_NM_WEBAPP_ADDRESS = "myriad.yarn.nodemanager.webapp.address";
-  public static final String KEY_NM_SHUFFLE_PORT = "myriad.mapreduce.shuffle.port";
-
-  private static final String ALL_LOCAL_IPV4ADDR = "0.0.0.0:";
-  private static final String PROPERTY_FORMAT = "-D%s=%s";
-
-  private Map<String, String> environment = new HashMap<>();
-  protected MyriadConfiguration cfg;
-
-  public NMExecutorCLGenImpl(MyriadConfiguration cfg) {
-    this.cfg = cfg;
-  }
-
-  @Override
-  public String generateCommandLine(ServiceResourceProfile profile, Ports ports) {
-    StringBuilder cmdLine = new StringBuilder();
-
-    generateEnvironment(profile, (NMPorts) ports);
-    appendCgroupsCmds(cmdLine);
-    appendYarnHomeExport(cmdLine);
-    appendEnvForNM(cmdLine);
-    cmdLine.append(YARN_NM_CMD);
-    return cmdLine.toString();
-  }
-
-  protected void generateEnvironment(ServiceResourceProfile profile, NMPorts ports) {
-    //yarnEnvironemnt configuration from yaml file
-    Map<String, String> yarnEnvironmentMap = cfg.getYarnEnvironment();
-    if (yarnEnvironmentMap != null) {
-      environment.putAll(yarnEnvironmentMap);
-    }
-
-    String rmHostName = System.getProperty(KEY_YARN_RM_HOSTNAME);
-    if (rmHostName != null && !rmHostName.isEmpty()) {
-      addYarnNodemanagerOpt(KEY_YARN_RM_HOSTNAME, rmHostName);
-    }
-
-    if (cfg.getNodeManagerConfiguration().getCgroups().or(Boolean.FALSE)) {
-      addYarnNodemanagerOpt(KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS, VAL_YARN_NM_CONTAINER_EXECUTOR_CLASS);
-      addYarnNodemanagerOpt(KEY_YARN_NM_LCE_RH_CLASS, VAL_YARN_NM_LCE_RH_CLASS);
-
-      // TODO: Configure hierarchy
-      addYarnNodemanagerOpt(KEY_YARN_NM_LCE_CGROUPS_HIERARCHY, VAL_YARN_NM_LCE_CGROUPS_HIERARCHY);
-      addYarnNodemanagerOpt(KEY_YARN_NM_LCE_CGROUPS_MOUNT, "true");
-      // TODO: Make it configurable
-      addYarnNodemanagerOpt(KEY_YARN_NM_LCE_CGROUPS_MOUNT_PATH, VAL_YARN_NM_LCE_CGROUPS_MOUNT_PATH);
-      addYarnNodemanagerOpt(KEY_YARN_NM_LCE_GROUP, "root");
-      if (environment.containsKey("YARN_HOME")) {
-        addYarnNodemanagerOpt(KEY_YARN_HOME, environment.get("YARN_HOME"));
-      }
-    } else {
-      // Otherwise configure to use Default
-      addYarnNodemanagerOpt(KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS, DEFAULT_YARN_NM_CONTAINER_EXECUTOR_CLASS);
-    }
-    addYarnNodemanagerOpt(KEY_NM_RESOURCE_CPU_VCORES, Integer.toString(profile.getCpus().intValue()));
-    addYarnNodemanagerOpt(KEY_NM_RESOURCE_MEM_MB, Integer.toString(profile.getMemory().intValue()));
-    addYarnNodemanagerOpt(KEY_NM_ADDRESS, ALL_LOCAL_IPV4ADDR + Long.valueOf(ports.getRpcPort()).toString());
-    addYarnNodemanagerOpt(KEY_NM_LOCALIZER_ADDRESS, ALL_LOCAL_IPV4ADDR + Long.valueOf(ports.getLocalizerPort()).toString());
-    addYarnNodemanagerOpt(KEY_NM_WEBAPP_ADDRESS, ALL_LOCAL_IPV4ADDR + Long.valueOf(ports.getWebAppHttpPort()).toString());
-    addYarnNodemanagerOpt(KEY_NM_SHUFFLE_PORT, Long.valueOf(ports.getShufflePort()).toString());
-  }
-
-  protected void appendEnvForNM(StringBuilder cmdLine) {
-    cmdLine.append(" env ");
-    for (Map.Entry<String, String> env : environment.entrySet()) {
-      cmdLine.append(env.getKey()).append("=").append("\"").append(env.getValue()).append("\" ");
-    }
-  }
-
-  protected void appendCgroupsCmds(StringBuilder cmdLine) {
-    if (cfg.getNodeManagerConfiguration().getCgroups().or(Boolean.FALSE)) {
-      cmdLine.append(" export TASK_DIR=`basename $PWD`;");
-      cmdLine.append(" chmod +x /sys/fs/cgroup/cpu/mesos/$TASK_DIR;");
-    }
-  }
-
-  protected void appendYarnHomeExport(StringBuilder cmdLine) {
-    if (environment.containsKey("YARN_HOME")) {
-      cmdLine.append(" export YARN_HOME=" + environment.get("YARN_HOME") + ";");
-    }
-  }
-
-  protected void addYarnNodemanagerOpt(String propertyName, String propertyValue) {
-    String envOpt = String.format(PROPERTY_FORMAT, propertyName, propertyValue);
-    if (environment.containsKey(ENV_YARN_NODEMANAGER_OPTS)) {
-      String existingOpts = environment.get(ENV_YARN_NODEMANAGER_OPTS);
-      environment.put(ENV_YARN_NODEMANAGER_OPTS, existingOpts + " " + envOpt);
-    } else {
-      environment.put(ENV_YARN_NODEMANAGER_OPTS, envOpt);
-    }
-  }
-
-  @Override
-  public String getConfigurationUrl() {
-    YarnConfiguration conf = new YarnConfiguration();
-    String httpPolicy = conf.get(TaskFactory.YARN_HTTP_POLICY);
-    if (httpPolicy != null && httpPolicy.equals(TaskFactory.YARN_HTTP_POLICY_HTTPS_ONLY)) {
-      String address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_WEBAPP_HTTPS_ADDRESS);
-      if (address == null || address.isEmpty()) {
-        address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_HOSTNAME) + ":8090";
-      }
-      return "https://" + address + "/conf";
-    } else {
-      String address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_WEBAPP_ADDRESS);
-      if (address == null || address.isEmpty()) {
-        address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_HOSTNAME) + ":8088";
-      }
-      return "http://" + address + "/conf";
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMPorts.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMPorts.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMPorts.java
deleted file mode 100644
index 39a9369..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMPorts.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler;
-
-import com.google.common.base.Preconditions;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Helper class for dynamically assigning ports to nodemanager
- */
-public class NMPorts implements Ports {
-  private static final String NM_RPC_PORT_KEY = "nm.rpc.port";
-  private static final String NM_LOCALIZER_PORT_KEY = "nm.localizer.port";
-  private static final String NM_WEBAPP_HTTP_PORT_KEY = "nm.webapp.http.port";
-  private static final String NM_HTTP_SHUFFLE_PORT_KEY = "nm.http.shuffle.port";
-
-  private static final String[] NM_PORT_KEYS = {NM_RPC_PORT_KEY, NM_LOCALIZER_PORT_KEY, NM_WEBAPP_HTTP_PORT_KEY, NM_HTTP_SHUFFLE_PORT_KEY};
-
-  private Map<String, Long> portsMap = new HashMap<>(NM_PORT_KEYS.length);
-
-  public NMPorts(Long[] ports) {
-    Preconditions.checkState(ports.length == NM_PORT_KEYS.length, "NMPorts: array \"ports\" is of unexpected length");
-    for (int i = 0; i < NM_PORT_KEYS.length; i++) {
-      portsMap.put(NM_PORT_KEYS[i], ports[i]);
-    }
-  }
-
-  public long getRpcPort() {
-    return portsMap.get(NM_RPC_PORT_KEY);
-  }
-
-  public long getLocalizerPort() {
-    return portsMap.get(NM_LOCALIZER_PORT_KEY);
-  }
-
-  public long getWebAppHttpPort() {
-    return portsMap.get(NM_WEBAPP_HTTP_PORT_KEY);
-  }
-
-  public long getShufflePort() {
-    return portsMap.get(NM_HTTP_SHUFFLE_PORT_KEY);
-  }
-
-  public static int expectedNumPorts() {
-    return NM_PORT_KEYS.length;
-  }
-
-  /**
-   * @return a string representation of NMPorts
-   */
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder().append("{");
-    for (String key : NM_PORT_KEYS) {
-      sb.append(key).append(": ").append(portsMap.get(key).toString()).append(", ");
-    }
-    sb.replace(sb.length() - 2, sb.length(), "}");
-    return sb.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMProfile.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMProfile.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMProfile.java
deleted file mode 100644
index aed0e06..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMProfile.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler;
-
-import com.google.gson.Gson;
-
-/**
- * Node Manager Profile
- */
-public class NMProfile {
-  private String name;
-
-  /**
-   * Number of CPU advertised to YARN Resource Manager.
-   */
-  private Long cpus;
-
-  /**
-   * Memory in MB advertised to YARN Resource Manager.
-   */
-  private Long memory;
-
-  public NMProfile(String name, Long cpus, Long memory) {
-    this.name = name;
-    this.cpus = cpus;
-    this.memory = memory;
-  }
-
-  public String getName() {
-    return name;
-  }
-
-  public Long getCpus() {
-    return cpus;
-  }
-
-  public Long getMemory() {
-    return memory;
-  }
-
-  @Override
-  public String toString() {
-    Gson gson = new Gson();
-    return gson.toJson(this);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMProfileManager.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMProfileManager.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMProfileManager.java
deleted file mode 100644
index c6829b1..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMProfileManager.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler;
-
-import com.google.gson.Gson;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Node Manager Profile Manager
- */
-public class NMProfileManager {
-  private static final Logger LOGGER = LoggerFactory.getLogger(NMProfileManager.class);
-
-  private Map<String, NMProfile> profiles = new ConcurrentHashMap<>();
-
-  public NMProfile get(String name) {
-    return profiles.get(name);
-  }
-
-  public void add(NMProfile profile) {
-    LOGGER.info("Adding profile {} with CPU: {} and Memory: {}", profile.getName(), profile.getCpus(), profile.getMemory());
-
-    profiles.put(profile.getName(), profile);
-  }
-
-  public boolean exists(String name) {
-    return this.profiles.containsKey(name);
-  }
-
-  public String toString() {
-    Gson gson = new Gson();
-    return gson.toJson(this);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMTaskFactoryAnnotation.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMTaskFactoryAnnotation.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMTaskFactoryAnnotation.java
deleted file mode 100644
index 29d961b..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMTaskFactoryAnnotation.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ebay.myriad.scheduler;
-
-import com.google.inject.BindingAnnotation;
-import java.lang.annotation.Target;
-import java.lang.annotation.Retention;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-
-/**
- * NMTaskFactory annotation that allows to bind TaskFactory to NM specific implementation
- */
-@BindingAnnotation
-@Target({FIELD, PARAMETER, METHOD})
-@Retention(RUNTIME)
-public @interface NMTaskFactoryAnnotation {
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Ports.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Ports.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Ports.java
deleted file mode 100644
index b862e75..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Ports.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler;
-
-/**
- * Generic interface to represent ports
- */
-public interface Ports {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java
deleted file mode 100644
index 6747c5a..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler;
-
-import com.ebay.myriad.state.SchedulerState;
-import javax.inject.Inject;
-import java.util.Set;
-
-import com.ebay.myriad.configuration.NodeManagerConfiguration;
-
-import org.apache.mesos.Protos;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link Rebalancer} is responsible for scaling registered YARN clusters as per
- * configured rules and policies.
- */
-public class Rebalancer implements Runnable {
-  private static final Logger LOGGER = LoggerFactory.getLogger(Rebalancer.class);
-
-  private final SchedulerState schedulerState;
-  private final MyriadOperations myriadOperations;
-  private final ServiceProfileManager profileManager;
-
-  @Inject
-  public Rebalancer(SchedulerState schedulerState, MyriadOperations myriadOperations, ServiceProfileManager profileManager) {
-    this.schedulerState = schedulerState;
-    this.myriadOperations = myriadOperations;
-    this.profileManager = profileManager;
-  }
-
-  @Override
-  public void run() {
-    final Set<Protos.TaskID> activeIds = schedulerState.getActiveTaskIds(NodeManagerConfiguration.NM_TASK_PREFIX);
-    final Set<Protos.TaskID> pendingIds = schedulerState.getPendingTaskIds(NodeManagerConfiguration.NM_TASK_PREFIX);
-    LOGGER.info("Active {}, Pending {}", activeIds.size(), pendingIds.size());
-    if (activeIds.size() < 1 && pendingIds.size() < 1) {
-      myriadOperations.flexUpCluster(profileManager.get("small"), 1, null);
-    }
-    //            RestAdapter restAdapter = new RestAdapter.Builder()
-    //                    .setEndpoint("http://" + host + ":" + port)
-    //                    .setLogLevel(LogLevel.FULL).build();
-    //            YARNResourceManagerService service = restAdapter
-    //                    .create(YARNResourceManagerService.class);
-    //
-    //            ClusterMetrics metrics = service.metrics().getClusterMetrics();
-    //            AppsResponse appsResponse = service.apps("ACCEPTED");
-    //
-    //            int acceptedApps = 0;
-    //
-    //            if (appsResponse == null || appsResponse.getApps() == null
-    //                    || appsResponse.getApps().getApps() == null) {
-    //                acceptedApps = 0;
-    //            } else {
-    //                acceptedApps = appsResponse.getApps().getApps().size();
-    //            }
-    //            LOGGER.info("Metrics: {}", metrics);
-    //            LOGGER.info("Apps: {}", appsResponse);
-    //
-    //            long availableMB = metrics.getAvailableMB();
-    //            long allocatedMB = metrics.getAllocatedMB();
-    //            long reservedMB = metrics.getReservedMB();
-    //            int activeNodes = metrics.getActiveNodes();
-    //            int unhealthyNodes = metrics.getUnhealthyNodes();
-    //            int appsPending = metrics.getAppsPending();
-    //            int appsRunning = metrics.getAppsRunning();
-
-    //            if (activeNodes == 0 && appsPending > 0) {
-    //                LOGGER.info(
-    //                        "Flexing up for condition: activeNodes ({}) == 0 && appsPending ({}) > 0",
-    //                        activeNodes, appsPending);
-    //                this.myriadOperations.flexUpCluster(clusterId, 1, "small");
-    //            } else if (appsPending == 0 && appsRunning == 0 && activeNodes > 0) {
-    //                LOGGER.info(
-    //                        "Flexing down for condition: appsPending ({}) == 0 && appsRunning ({}) == 0 && activeNodes ({}) > 0",
-    //                        appsPending, appsRunning, activeNodes);
-    //                this.myriadOperations.flexDownCluster(cluster, 1);
-    //            } else if (acceptedApps > 0) {
-    //                LOGGER.info("Flexing up for condition: acceptedApps ({}) > 0",
-    //                        acceptedApps);
-    //                this.myriadOperations.flexUpCluster(clusterId, 1, "small");
-    //            } else {
-    //                LOGGER.info("Nothing to rebalance");
-    //                this.schedulerState.releaseLock(clusterId);
-    //            }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ReconcileService.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ReconcileService.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ReconcileService.java
deleted file mode 100644
index be47bef..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ReconcileService.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler;
-
-import com.ebay.myriad.configuration.MyriadConfiguration;
-import com.ebay.myriad.state.SchedulerState;
-import org.apache.mesos.Protos;
-import org.apache.mesos.SchedulerDriver;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.inject.Inject;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-
-/**
- * {@link ReconcileService} is responsible for reconciling tasks with the mesos master
- */
-public class ReconcileService {
-  private static final Logger LOGGER = LoggerFactory.getLogger(ReconcileService.class);
-
-  public static final long DEFAULT_RECONCILATION_DELAY_MS = 10000;
-  public static final long MAX_RECONCILE_ATTEMPTS = 10;
-
-  private SchedulerState state;
-  private MyriadConfiguration cfg;
-  private Date lastReconcileTime;
-
-  @Inject
-  public ReconcileService(SchedulerState state, MyriadConfiguration cfg) {
-    this.state = state;
-    this.cfg = cfg;
-  }
-
-  public void reconcile(SchedulerDriver driver) {
-    Collection<Protos.TaskStatus> taskStatuses = state.getTaskStatuses();
-
-    if (taskStatuses.size() == 0) {
-      return;
-    }
-    LOGGER.info("Reconciling {} tasks.", taskStatuses.size());
-
-    driver.reconcileTasks(taskStatuses);
-
-    lastReconcileTime = new Date();
-
-    int attempt = 1;
-
-    while (attempt <= MAX_RECONCILE_ATTEMPTS) {
-      try {
-        // TODO(mohit): Using exponential backoff here, maybe backoff strategy should be configurable.
-        Thread.sleep(DEFAULT_RECONCILATION_DELAY_MS * attempt);
-      } catch (InterruptedException e) {
-        LOGGER.error("Interrupted", e);
-      }
-      Collection<Protos.TaskStatus> notYetReconciled = new ArrayList<>();
-      for (Protos.TaskStatus status : state.getTaskStatuses()) {
-        if (status.getTimestamp() < lastReconcileTime.getTime()) {
-          notYetReconciled.add(status);
-        }
-      }
-      LOGGER.info("Reconcile attempt {} for {} tasks", attempt, notYetReconciled.size());
-      driver.reconcileTasks(notYetReconciled);
-      lastReconcileTime = new Date();
-      attempt++;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/SchedulerUtils.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/SchedulerUtils.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/SchedulerUtils.java
deleted file mode 100644
index cda1c0e..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/SchedulerUtils.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler;
-
-import com.ebay.myriad.configuration.NodeManagerConfiguration;
-import com.ebay.myriad.state.NodeTask;
-import com.ebay.myriad.state.SchedulerState;
-import com.google.common.base.Preconditions;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.mesos.Protos;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-
-/**
- * Provides utilities for scheduling with the mesos offers
- */
-public class SchedulerUtils {
-  private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerUtils.class);
-
-  public static boolean isUniqueHostname(Protos.OfferOrBuilder offer, NodeTask taskToLaunch, Collection<NodeTask> tasks) {
-    Preconditions.checkArgument(offer != null);
-    String offerHostname = offer.getHostname();
-
-    if (!CollectionUtils.isEmpty(tasks)) {
-      for (NodeTask task : tasks) {
-        if (offerHostname.equalsIgnoreCase(task.getHostname())) {
-          LOGGER.debug("Offer's hostname {} is not unique", offerHostname);
-          return false;
-        }
-      }
-    }
-    LOGGER.debug("Offer's hostname {} is unique", offerHostname);
-    return true;
-  }
-
-  /**
-   * Determines if a given host has a nodemanager running with zero profile. Node Managers
-   * launched with zero profile (zero cpu & memory) are eligible for fine grained scaling.
-   * Node Managers launched with a non-zero profile size are not eligible for fine grained scaling.
-   *
-   * @param hostName
-   * @return
-   */
-  public static boolean isEligibleForFineGrainedScaling(String hostName, SchedulerState state) {
-    for (NodeTask activeNMTask : state.getActiveTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX)) {
-      if (activeNMTask.getProfile().getCpus() == 0 &&
-          activeNMTask.getProfile().getMemory() == 0 &&
-          activeNMTask.getHostname().equals(hostName)) {
-        return true;
-      }
-    }
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceCommandLineGenerator.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceCommandLineGenerator.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceCommandLineGenerator.java
deleted file mode 100644
index aa8c1b6..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceCommandLineGenerator.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ebay.myriad.scheduler;
-
-import com.ebay.myriad.configuration.MyriadConfiguration;
-
-/**
- * CommandLineGenerator for any aux service launched by Myriad as binary distro
- */
-public class ServiceCommandLineGenerator extends DownloadNMExecutorCLGenImpl {
-
-
-  public ServiceCommandLineGenerator(MyriadConfiguration cfg, String nodeManagerUri) {
-    super(cfg, nodeManagerUri);
-  }
-
-  @Override
-  public String generateCommandLine(ServiceResourceProfile profile, Ports ports) {
-    StringBuilder strB = new StringBuilder();
-    appendDistroExtractionCommands(strB);
-    appendUser(strB);
-    return strB.toString();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceProfileManager.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceProfileManager.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceProfileManager.java
deleted file mode 100644
index 8ee3bc1..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceProfileManager.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.gson.Gson;
-
-/**
- * Class to keep all the ServiceResourceProfiles together
- */
-public class ServiceProfileManager {
-  private static final Logger LOGGER = LoggerFactory.getLogger(ServiceProfileManager.class);
-
-  private Map<String, ServiceResourceProfile> profiles = new ConcurrentHashMap<>();
-
-  public ServiceResourceProfile get(String name) {
-    return profiles.get(name);
-  }
-
-  public void add(ServiceResourceProfile profile) {
-    LOGGER.info("Adding profile {} with CPU: {} and Memory: {}", profile.getName(), profile.getCpus(), profile.getMemory());
-    profiles.put(profile.getName(), profile);
-  }
-
-  public boolean exists(String name) {
-    return this.profiles.containsKey(name);
-  }
-
-  public String toString() {
-    Gson gson = new Gson();
-    return gson.toJson(this);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceResourceProfile.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceResourceProfile.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceResourceProfile.java
deleted file mode 100644
index 2b36b22..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceResourceProfile.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler;
-
-import java.lang.reflect.Type;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.gson.Gson;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonParseException;
-
-/**
- * Resource Profile for any service
- */
-public class ServiceResourceProfile {
-
-  protected final String name;
-
-  /**
-   * Number of CPU needed to run a service
-   */
-  protected final Double cpus;
-
-  /**
-   * Memory in MB needed to run a service
-   */
-  protected final Double memory;
-
-  protected Double executorCpu = 0.0;
-
-  protected Double executorMemory = 0.0;
-
-  protected String className;
-
-  public ServiceResourceProfile(String name, Double cpu, Double mem) {
-    this.name = name;
-    this.cpus = cpu;
-    this.memory = mem;
-    this.className = ServiceResourceProfile.class.getName();
-  }
-
-
-  public String getName() {
-    return name;
-  }
-
-  public Double getCpus() {
-    return cpus;
-  }
-
-  public Double getMemory() {
-    return memory;
-  }
-
-  public Double getAggregateMemory() {
-    return memory;
-  }
-
-  public Double getAggregateCpu() {
-    return cpus;
-  }
-
-  public Double getExecutorCpu() {
-    return executorCpu;
-  }
-
-  public void setExecutorCpu(Double executorCpu) {
-    this.executorCpu = executorCpu;
-  }
-
-  public Double getExecutorMemory() {
-    return executorMemory;
-  }
-
-  public void setExecutorMemory(Double executorMemory) {
-    this.executorMemory = executorMemory;
-  }
-
-
-  @Override
-  public String toString() {
-    Gson gson = new Gson();
-    return gson.toJson(this);
-  }
-
-  /**
-   * Custom serializer to help with deserialization of class hierarchy
-   * since at the point of deserialization we don't know class of the data
-   * that is being deserialized
-   */
-  public static class CustomDeserializer implements JsonDeserializer<ServiceResourceProfile> {
-    private static final Logger LOGGER = LoggerFactory.getLogger(CustomDeserializer.class);
-
-    @Override
-    public ServiceResourceProfile deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
-      String type = json.getAsJsonObject().get("className").getAsString();
-      try {
-        @SuppressWarnings("rawtypes") Class c = Class.forName(type);
-        if (ServiceResourceProfile.class.equals(c)) {
-          return new Gson().fromJson(json, typeOfT);
-        }
-        ServiceResourceProfile profile = context.deserialize(json, c);
-        return profile;
-      } catch (ClassNotFoundException e) {
-        LOGGER.error("Classname is not found", e);
-      }
-      return null;
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceTaskConstraints.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceTaskConstraints.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceTaskConstraints.java
deleted file mode 100644
index 45e5327..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceTaskConstraints.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler;
-
-import java.util.Map;
-
-import com.ebay.myriad.configuration.MyriadConfiguration;
-import com.ebay.myriad.configuration.ServiceConfiguration;
-
-/**
- * ServiceTaskConstraints is an implementation of TaskConstraints for a service
- * at this point constraints are on ports
- * Later on there may be other types of constraints added
- */
-public class ServiceTaskConstraints implements TaskConstraints {
-
-  private int portsCount;
-
-  public ServiceTaskConstraints(MyriadConfiguration cfg, String taskPrefix) {
-    this.portsCount = 0;
-    Map<String, ServiceConfiguration> auxConfigs = cfg.getServiceConfigurations();
-    if (auxConfigs == null) {
-      return;
-    }
-    ServiceConfiguration serviceConfig = auxConfigs.get(taskPrefix);
-    if (serviceConfig != null) {
-      if (serviceConfig.getPorts().isPresent()) {
-        this.portsCount = serviceConfig.getPorts().get().size();
-      }
-    }
-  }
-
-  @Override
-  public int portsCount() {
-    return portsCount;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceTaskFactoryImpl.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceTaskFactoryImpl.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceTaskFactoryImpl.java
deleted file mode 100644
index ead2326..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceTaskFactoryImpl.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ebay.myriad.scheduler;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-import javax.inject.Inject;
-
-import org.apache.mesos.Protos.CommandInfo;
-import org.apache.mesos.Protos.ExecutorInfo;
-import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.Resource;
-import org.apache.mesos.Protos.TaskID;
-import org.apache.mesos.Protos.TaskInfo;
-import org.apache.mesos.Protos.Value;
-import org.apache.mesos.Protos.CommandInfo.URI;
-import org.apache.mesos.Protos.Value.Scalar;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.ebay.myriad.configuration.MyriadConfiguration;
-import com.ebay.myriad.configuration.MyriadExecutorConfiguration;
-import com.ebay.myriad.configuration.ServiceConfiguration;
-import com.ebay.myriad.state.NodeTask;
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * Generic Service Class that allows to create a service solely base don the configuration
- * Main properties of configuration are:
- * 1. command to run
- * 2. Additional env. variables to set (serviceOpts)
- * 3. ports to use with names of the properties
- * 4. TODO (yufeldman) executor info
- */
-public class ServiceTaskFactoryImpl implements TaskFactory {
-  private static final Logger LOGGER = LoggerFactory.getLogger(ServiceTaskFactoryImpl.class);
-
-  public static final long DEFAULT_PORT_NUMBER = 0;
-
-  private MyriadConfiguration cfg;
-  @SuppressWarnings("unused")
-  private TaskUtils taskUtils;
-  private ServiceCommandLineGenerator clGenerator;
-
-  @Inject
-  public ServiceTaskFactoryImpl(MyriadConfiguration cfg, TaskUtils taskUtils) {
-    this.cfg = cfg;
-    this.taskUtils = taskUtils;
-    this.clGenerator = new ServiceCommandLineGenerator(cfg, cfg.getMyriadExecutorConfiguration().getNodeManagerUri().orNull());
-  }
-
-  @Override
-  public TaskInfo createTask(Offer offer, FrameworkID frameworkId, TaskID taskId, NodeTask nodeTask) {
-    Objects.requireNonNull(offer, "Offer should be non-null");
-    Objects.requireNonNull(nodeTask, "NodeTask should be non-null");
-
-    ServiceConfiguration serviceConfig = cfg.getServiceConfiguration(nodeTask.getTaskPrefix());
-
-    Objects.requireNonNull(serviceConfig, "ServiceConfig should be non-null");
-    Objects.requireNonNull(serviceConfig.getCommand().orNull(), "command for ServiceConfig should be non-null");
-
-    final String serviceHostName = "0.0.0.0";
-    final String serviceEnv = serviceConfig.getEnvSettings();
-    final String rmHostName = System.getProperty(YARN_RESOURCEMANAGER_HOSTNAME);
-    List<Long> additionalPortsNumbers = null;
-
-    final StringBuilder strB = new StringBuilder("env ");
-    if (serviceConfig.getServiceOpts() != null) {
-      strB.append(serviceConfig.getServiceOpts()).append("=");
-
-      strB.append("\"");
-      if (rmHostName != null && !rmHostName.isEmpty()) {
-        strB.append("-D" + YARN_RESOURCEMANAGER_HOSTNAME + "=" + rmHostName + " ");
-      }
-
-      Map<String, Long> ports = serviceConfig.getPorts().orNull();
-      if (ports != null && !ports.isEmpty()) {
-        int neededPortsCount = 0;
-        for (Map.Entry<String, Long> portEntry : ports.entrySet()) {
-          Long port = portEntry.getValue();
-          if (port == DEFAULT_PORT_NUMBER) {
-            neededPortsCount++;
-          }
-        }
-        // use provided ports
-        additionalPortsNumbers = getAvailablePorts(offer, neededPortsCount);
-        LOGGER.info("No specified ports found or number of specified ports is not enough. Using ports from Mesos Offers: {}", additionalPortsNumbers);
-        int index = 0;
-        for (Map.Entry<String, Long> portEntry : ports.entrySet()) {
-          String portProperty = portEntry.getKey();
-          Long port = portEntry.getValue();
-          if (port == DEFAULT_PORT_NUMBER) {
-            port = additionalPortsNumbers.get(index++);
-          }
-          strB.append("-D" + portProperty + "=" + serviceHostName + ":" + port + " ");
-        }
-      }
-      strB.append(serviceEnv);
-      strB.append("\"");
-    }
-
-    strB.append(" ");
-    strB.append(serviceConfig.getCommand().get());
-
-    CommandInfo commandInfo = createCommandInfo(nodeTask.getProfile(), strB.toString());
-
-    LOGGER.info("Command line for service: {} is: {}", nodeTask.getTaskPrefix(), strB.toString());
-
-    Scalar taskMemory = Scalar.newBuilder().setValue(nodeTask.getProfile().getMemory()).build();
-    Scalar taskCpus = Scalar.newBuilder().setValue(nodeTask.getProfile().getCpus()).build();
-
-    TaskInfo.Builder taskBuilder = TaskInfo.newBuilder();
-
-    taskBuilder.setName(nodeTask.getTaskPrefix()).setTaskId(taskId).setSlaveId(offer.getSlaveId()).addResources(Resource.newBuilder().setName("cpus").setType(Value.Type.SCALAR).setScalar(taskCpus).build()).addResources(Resource.newBuilder()
-        .setName("mem").setType(Value.Type.SCALAR).setScalar(taskMemory).build());
-
-    if (additionalPortsNumbers != null && !additionalPortsNumbers.isEmpty()) {
-      // set ports
-      Value.Ranges.Builder valueRanger = Value.Ranges.newBuilder();
-      for (Long port : additionalPortsNumbers) {
-        valueRanger.addRange(Value.Range.newBuilder().setBegin(port).setEnd(port));
-      }
-
-      taskBuilder.addResources(Resource.newBuilder().setName("ports").setType(Value.Type.RANGES).setRanges(valueRanger.build()));
-    }
-    taskBuilder.setCommand(commandInfo);
-    return taskBuilder.build();
-  }
-
-  @VisibleForTesting
-  CommandInfo createCommandInfo(ServiceResourceProfile profile, String executorCmd) {
-    MyriadExecutorConfiguration myriadExecutorConfiguration = cfg.getMyriadExecutorConfiguration();
-    CommandInfo.Builder commandInfo = CommandInfo.newBuilder();
-    Map<String, String> envVars = cfg.getYarnEnvironment();
-    if (envVars != null && !envVars.isEmpty()) {
-      org.apache.mesos.Protos.Environment.Builder yarnHomeB = org.apache.mesos.Protos.Environment.newBuilder();
-      for (Map.Entry<String, String> envEntry : envVars.entrySet()) {
-        org.apache.mesos.Protos.Environment.Variable.Builder yarnEnvB = org.apache.mesos.Protos.Environment.Variable.newBuilder();
-        yarnEnvB.setName(envEntry.getKey()).setValue(envEntry.getValue());
-        yarnHomeB.addVariables(yarnEnvB.build());
-      }
-      commandInfo.mergeEnvironment(yarnHomeB.build());
-    }
-
-    if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) {
-      //Both FrameworkUser and FrameworkSuperuser to get all of the directory permissions correct.
-      if (!(cfg.getFrameworkUser().isPresent() && cfg.getFrameworkSuperUser().isPresent())) {
-        throw new RuntimeException("Trying to use remote distribution, but frameworkUser" + "and/or frameworkSuperUser not set!");
-      }
-
-      LOGGER.info("Using remote distribution");
-      String clGeneratedCommand = clGenerator.generateCommandLine(profile, null);
-
-      String nmURIString = myriadExecutorConfiguration.getNodeManagerUri().get();
-
-      //Concatenate all the subcommands
-      String cmd = clGeneratedCommand + " " + executorCmd;
-
-      //get the nodemanagerURI
-      //We're going to extract ourselves, so setExtract is false
-      LOGGER.info("Getting Hadoop distribution from:" + nmURIString);
-      URI nmUri = URI.newBuilder().setValue(nmURIString).setExtract(false).build();
-
-      //get configs directly from resource manager
-      String configUrlString = clGenerator.getConfigurationUrl();
-      LOGGER.info("Getting config from:" + configUrlString);
-      URI configUri = URI.newBuilder().setValue(configUrlString).build();
-
-      LOGGER.info("Slave will execute command:" + cmd);
-      commandInfo.addUris(nmUri).addUris(configUri).setValue("echo \"" + cmd + "\";" + cmd);
-      commandInfo.setUser(cfg.getFrameworkSuperUser().get());
-
-    } else {
-      commandInfo.setValue(executorCmd);
-    }
-    return commandInfo.build();
-  }
-
-  @Override
-  public ExecutorInfo getExecutorInfoForSlave(FrameworkID frameworkId, Offer offer, CommandInfo commandInfo) {
-    // TODO (yufeldman) if executor specified use it , otherwise return null
-    // nothing to implement here, since we are using default slave executor
-    return null;
-  }
-
-  /**
-   * Helper method to reserve ports
-   *
-   * @param offer
-   * @param requestedPorts
-   * @return
-   */
-  private List<Long> getAvailablePorts(Offer offer, int requestedPorts) {
-    if (requestedPorts == 0) {
-      return null;
-    }
-    final List<Long> returnedPorts = new ArrayList<>();
-    for (Resource resource : offer.getResourcesList()) {
-      if (resource.getName().equals("ports")) {
-        Iterator<Value.Range> itr = resource.getRanges().getRangeList().iterator();
-        while (itr.hasNext()) {
-          Value.Range range = itr.next();
-          if (range.getBegin() <= range.getEnd()) {
-            long i = range.getBegin();
-            while (i <= range.getEnd() && returnedPorts.size() < requestedPorts) {
-              returnedPorts.add(i);
-              i++;
-            }
-            if (returnedPorts.size() >= requestedPorts) {
-              return returnedPorts;
-            }
-          }
-        }
-      }
-    }
-    // this is actually an error condition - we did not have enough ports to use
-    return returnedPorts;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskConstraints.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskConstraints.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskConstraints.java
deleted file mode 100644
index 2af11a7..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskConstraints.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ebay.myriad.scheduler;
-
-/**
- * Generic interface to represent some constraints that task can impose
- * while figuring out whether to accept or reject the offer
- * We may start small and then eventually add more constraints
- */
-public interface TaskConstraints {
-
-  /**
-   * Required number of ports
-   *
-   * @return portsNumber
-   */
-  public int portsCount();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskConstraintsManager.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskConstraintsManager.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskConstraintsManager.java
deleted file mode 100644
index d4973c2..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskConstraintsManager.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Factory class to keep map of the constraints
- */
-public class TaskConstraintsManager {
-
-  /**
-   * Since all the additions will happen during init time, there is no need to make this map Concurrent
-   * if/when later on it will change we may need to change HashMap to Concurrent one
-   */
-  private Map<String, TaskConstraints> taskConstraintsMap = new HashMap<>();
-
-  public TaskConstraints getConstraints(String taskPrefix) {
-    return taskConstraintsMap.get(taskPrefix);
-  }
-
-  public void addTaskConstraints(final String taskPrefix, final TaskConstraints taskConstraints) {
-    if (taskConstraints != null) {
-      taskConstraintsMap.put(taskPrefix, taskConstraints);
-    }
-  }
-
-  public boolean exists(String taskPrefix) {
-    return taskConstraintsMap.containsKey(taskPrefix);
-  }
-}


Mime
View raw message