myriad-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From smare...@apache.org
Subject [16/20] incubator-myriad git commit: com.ebay => org.apache
Date Wed, 28 Oct 2015 16:07:48 GMT
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
deleted file mode 100644
index a961cfe..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler.event.handlers;
-
-import com.ebay.myriad.scheduler.SchedulerUtils;
-import com.ebay.myriad.scheduler.ServiceResourceProfile;
-import com.ebay.myriad.scheduler.TaskConstraints;
-import com.ebay.myriad.scheduler.TaskConstraintsManager;
-import com.ebay.myriad.scheduler.TaskFactory;
-import com.ebay.myriad.scheduler.TaskUtils;
-import com.ebay.myriad.scheduler.constraints.Constraint;
-import com.ebay.myriad.scheduler.constraints.LikeConstraint;
-import com.ebay.myriad.scheduler.event.ResourceOffersEvent;
-import com.ebay.myriad.scheduler.fgs.OfferLifecycleManager;
-import com.ebay.myriad.state.NodeTask;
-import com.ebay.myriad.state.SchedulerState;
-import com.google.common.collect.Sets;
-import com.lmax.disruptor.EventHandler;
-
-import java.util.Iterator;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.mesos.Protos;
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.Resource;
-import org.apache.mesos.Protos.TaskInfo;
-import org.apache.mesos.Protos.Value;
-import org.apache.mesos.SchedulerDriver;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.inject.Inject;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * handles and logs resource offers events
- */
-public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEvent> {
-  private static final Logger LOGGER = LoggerFactory.getLogger(ResourceOffersEventHandler.class);
-
-  private static final Lock driverOperationLock = new ReentrantLock();
-
-  private static final String RESOURCES_CPU_KEY = "cpus";
-  private static final String RESOURCES_MEM_KEY = "mem";
-  private static final String RESOURCES_PORTS_KEY = "ports";
-  private static final String RESOURCES_DISK_KEY = "disk";
-
-
-  @Inject
-  private SchedulerState schedulerState;
-
-  @Inject
-  private TaskUtils taskUtils;
-
-  @Inject
-  private Map<String, TaskFactory> taskFactoryMap;
-
-  @Inject
-  private OfferLifecycleManager offerLifecycleMgr;
-
-  @Inject
-  private TaskConstraintsManager taskConstraintsManager;
-
-  @Override
-  public void onEvent(ResourceOffersEvent event, long sequence, boolean endOfBatch) throws Exception {
-    SchedulerDriver driver = event.getDriver();
-    List<Offer> offers = event.getOffers();
-
-    // Sometimes, we see that mesos sends resource offers before Myriad receives
-    // a notification for "framework registration". This is a simple defensive code
-    // to not process any offers unless Myriad receives a "framework registered" notification.
-    if (schedulerState.getFrameworkID() == null) {
-      LOGGER.warn("Received {} offers, but declining them since Framework ID is not yet set", offers.size());
-      for (Offer offer : offers) {
-        driver.declineOffer(offer.getId());
-      }
-      return;
-    }
-    LOGGER.info("Received offers {}", offers.size());
-    LOGGER.debug("Pending tasks: {}", this.schedulerState.getPendingTaskIds());
-    driverOperationLock.lock();
-    try {
-      for (Iterator<Offer> iterator = offers.iterator(); iterator.hasNext(); ) {
-        Offer offer = iterator.next();
-        Set<NodeTask> nodeTasks = schedulerState.getNodeTasks(offer.getSlaveId());
-        for (NodeTask nodeTask : nodeTasks) {
-          nodeTask.setSlaveAttributes(offer.getAttributesList());
-        }
-        // keep this in case SchedulerState gets out of sync. This should not happen with 
-        // synchronizing addNodes method in SchedulerState
-        // but to keep it safe
-        final Set<Protos.TaskID> missingTasks = Sets.newHashSet();
-        Set<Protos.TaskID> pendingTasks = schedulerState.getPendingTaskIds();
-        if (CollectionUtils.isNotEmpty(pendingTasks)) {
-          for (Protos.TaskID pendingTaskId : pendingTasks) {
-            NodeTask taskToLaunch = schedulerState.getTask(pendingTaskId);
-            if (taskToLaunch == null) {
-              missingTasks.add(pendingTaskId);
-              LOGGER.warn("Node task for TaskID: {} does not exist", pendingTaskId);
-              continue;
-            }
-            String taskPrefix = taskToLaunch.getTaskPrefix();
-            ServiceResourceProfile profile = taskToLaunch.getProfile();
-            Constraint constraint = taskToLaunch.getConstraint();
-
-            Set<NodeTask> launchedTasks = new HashSet<>();
-            launchedTasks.addAll(schedulerState.getActiveTasksByType(taskPrefix));
-            launchedTasks.addAll(schedulerState.getStagingTasksByType(taskPrefix));
-
-            if (matches(offer, taskToLaunch, constraint) && SchedulerUtils.isUniqueHostname(offer, taskToLaunch, launchedTasks)) {
-              try {
-                final TaskInfo task = taskFactoryMap.get(taskPrefix).createTask(offer, schedulerState.getFrameworkID(), pendingTaskId, taskToLaunch);
-                List<OfferID> offerIds = new ArrayList<>();
-                offerIds.add(offer.getId());
-                List<TaskInfo> tasks = new ArrayList<>();
-                tasks.add(task);
-                LOGGER.info("Launching task: {} using offer: {}", task.getTaskId().getValue(), offer.getId());
-                LOGGER.debug("Launching task: {} with profile: {} using offer: {}", task, profile, offer);
-                driver.launchTasks(offerIds, tasks);
-                schedulerState.makeTaskStaging(pendingTaskId);
-
-                // For every NM Task that we launch, we currently
-                // need to backup the ExecutorInfo for that NM Task in the State Store.
-                // Without this, we will not be able to launch tasks corresponding to yarn
-                // containers. This is specially important in case the RM restarts.
-                taskToLaunch.setExecutorInfo(task.getExecutor());
-                taskToLaunch.setHostname(offer.getHostname());
-                taskToLaunch.setSlaveId(offer.getSlaveId());
-                schedulerState.addTask(pendingTaskId, taskToLaunch);
-                iterator.remove(); // remove the used offer from offers list
-                break;
-              } catch (Throwable t) {
-                LOGGER.error("Exception thrown while trying to create a task for {}", taskPrefix, t);
-              }
-            }
-          }
-          for (Protos.TaskID taskId : missingTasks) {
-            schedulerState.removeTask(taskId);
-          }
-        }
-      }
-
-      for (Offer offer : offers) {
-        if (SchedulerUtils.isEligibleForFineGrainedScaling(offer.getHostname(), schedulerState)) {
-          if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Picking an offer from slave with hostname {} for fine grained scaling.", offer.getHostname());
-          }
-          offerLifecycleMgr.addOffers(offer);
-        } else {
-          if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Declining offer {} from slave {}.", offer, offer.getHostname());
-          }
-          driver.declineOffer(offer.getId());
-        }
-      }
-    } finally {
-      driverOperationLock.unlock();
-    }
-  }
-
-  private boolean matches(Offer offer, NodeTask taskToLaunch, Constraint constraint) {
-    if (!meetsConstraint(offer, constraint)) {
-      return false;
-    }
-    Map<String, Object> results = new HashMap<String, Object>(5);
-    //Assign default values to avoid NPE
-    results.put(RESOURCES_CPU_KEY, Double.valueOf(0.0));
-    results.put(RESOURCES_MEM_KEY, Double.valueOf(0.0));
-    results.put(RESOURCES_DISK_KEY, Double.valueOf(0.0));
-    results.put(RESOURCES_PORTS_KEY, Integer.valueOf(0));
-
-    for (Resource resource : offer.getResourcesList()) {
-      if (resourceEvaluators.containsKey(resource.getName())) {
-        resourceEvaluators.get(resource.getName()).eval(resource, results);
-      } else {
-        LOGGER.warn("Ignoring unknown resource type: {}", resource.getName());
-      }
-    }
-    double cpus = (Double) results.get(RESOURCES_CPU_KEY);
-    double mem = (Double) results.get(RESOURCES_MEM_KEY);
-    int ports = (Integer) results.get(RESOURCES_PORTS_KEY);
-
-    checkResource(cpus <= 0, RESOURCES_CPU_KEY);
-    checkResource(mem <= 0, RESOURCES_MEM_KEY);
-    checkResource(ports <= 0, RESOURCES_PORTS_KEY);
-
-    return checkAggregates(offer, taskToLaunch, ports, cpus, mem);
-  }
-
-  private boolean checkAggregates(Offer offer, NodeTask taskToLaunch, int ports, double cpus, double mem) {
-    final ServiceResourceProfile profile = taskToLaunch.getProfile();
-    final String taskPrefix = taskToLaunch.getTaskPrefix();
-    final double aggrCpu = profile.getAggregateCpu() + profile.getExecutorCpu();
-    final double aggrMem = profile.getAggregateMemory() + profile.getExecutorMemory();
-    final TaskConstraints taskConstraints = taskConstraintsManager.getConstraints(taskPrefix);
-    if (aggrCpu <= cpus && aggrMem <= mem && taskConstraints.portsCount() <= ports) {
-      return true;
-    } else {
-      LOGGER.info("Offer not sufficient for task with, cpu: {}, memory: {}, ports: {}", aggrCpu, aggrMem, ports);
-      return false;
-    }
-  }
-
-  private boolean meetsConstraint(Offer offer, Constraint constraint) {
-    if (constraint != null) {
-      switch (constraint.getType()) {
-        case LIKE: {
-          LikeConstraint likeConstraint = (LikeConstraint) constraint;
-          if (likeConstraint.isConstraintOnHostName()) {
-            return likeConstraint.matchesHostName(offer.getHostname());
-          } else {
-            return likeConstraint.matchesSlaveAttributes(offer.getAttributesList());
-          }
-        }
-        default:
-          return false;
-      }
-    }
-    return true;
-  }
-
-  private void checkResource(boolean fail, String resource) {
-    if (fail) {
-      LOGGER.info("No " + resource + " resources present");
-    }
-  }
-
-  private static Double scalarToDouble(Resource resource, String id) {
-    Double value = new Double(0.0);
-    if (resource.getType().equals(Value.Type.SCALAR)) {
-      value = new Double(resource.getScalar().getValue());
-    } else {
-      LOGGER.error(id + " resource was not a scalar: {}", resource.getType().toString());
-    }
-    return value;
-  }
-
-  private interface EvalResources {
-    public void eval(Resource resource, Map<String, Object> results);
-  }
-
-  private static Map<String, EvalResources> resourceEvaluators;
-
-  static {
-    resourceEvaluators = new HashMap<String, EvalResources>(4);
-    resourceEvaluators.put(RESOURCES_CPU_KEY, new EvalResources() {
-      public void eval(Resource resource, Map<String, Object> results) {
-        results.put(RESOURCES_CPU_KEY, (Double) results.get(RESOURCES_CPU_KEY) + scalarToDouble(resource, RESOURCES_CPU_KEY));
-      }
-    });
-    resourceEvaluators.put(RESOURCES_MEM_KEY, new EvalResources() {
-      public void eval(Resource resource, Map<String, Object> results) {
-        results.put(RESOURCES_MEM_KEY, (Double) results.get(RESOURCES_MEM_KEY) + scalarToDouble(resource, RESOURCES_MEM_KEY));
-      }
-    });
-    resourceEvaluators.put(RESOURCES_DISK_KEY, new EvalResources() {
-      public void eval(Resource resource, Map<String, Object> results) {
-      }
-    });
-    resourceEvaluators.put(RESOURCES_PORTS_KEY, new EvalResources() {
-      public void eval(Resource resource, Map<String, Object> results) {
-        int ports = 0;
-        if (resource.getType().equals(Value.Type.RANGES)) {
-          Value.Ranges ranges = resource.getRanges();
-          for (Value.Range range : ranges.getRangeList()) {
-            if (range.getBegin() < range.getEnd()) {
-              ports += range.getEnd() - range.getBegin() + 1;
-            }
-          }
-        } else {
-          LOGGER.error("ports resource was not Ranges: {}", resource.getType().toString());
-
-        }
-        results.put(RESOURCES_PORTS_KEY, (Integer) results.get(RESOURCES_PORTS_KEY) + Integer.valueOf(ports));
-      }
-    });
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/SlaveLostEventHandler.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/SlaveLostEventHandler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/SlaveLostEventHandler.java
deleted file mode 100644
index bbd1292..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/SlaveLostEventHandler.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler.event.handlers;
-
-import com.ebay.myriad.scheduler.event.SlaveLostEvent;
-import com.lmax.disruptor.EventHandler;
-import org.apache.mesos.Protos.SlaveID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * handles and logs mesos slave lost events
- */
-public class SlaveLostEventHandler implements EventHandler<SlaveLostEvent> {
-  private static final Logger LOGGER = LoggerFactory.getLogger(SlaveLostEventHandler.class);
-
-  @Override
-  public void onEvent(SlaveLostEvent event, long sequence, boolean endOfBatch) throws Exception {
-    SlaveID slaveId = event.getSlaveId();
-    LOGGER.info("Slave {} lost!", slaveId);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
deleted file mode 100644
index 34b8712..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler.event.handlers;
-
-import com.ebay.myriad.scheduler.event.StatusUpdateEvent;
-import com.ebay.myriad.scheduler.fgs.OfferLifecycleManager;
-import com.ebay.myriad.state.NodeTask;
-import com.ebay.myriad.state.SchedulerState;
-import com.lmax.disruptor.EventHandler;
-
-import javax.inject.Inject;
-
-import org.apache.mesos.Protos.TaskID;
-import org.apache.mesos.Protos.TaskState;
-import org.apache.mesos.Protos.TaskStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * handles and logs mesos status update events
- */
-public class StatusUpdateEventHandler implements EventHandler<StatusUpdateEvent> {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(StatusUpdateEventHandler.class);
-
-  private final SchedulerState schedulerState;
-  private final OfferLifecycleManager offerLifecycleManager;
-
-  @Inject
-  public StatusUpdateEventHandler(SchedulerState schedulerState, OfferLifecycleManager offerLifecycleManager) {
-    this.schedulerState = schedulerState;
-    this.offerLifecycleManager = offerLifecycleManager;
-  }
-
-  @Override
-  public void onEvent(StatusUpdateEvent event, long sequence, boolean endOfBatch) throws Exception {
-    TaskStatus status = event.getStatus();
-    this.schedulerState.updateTask(status);
-    TaskID taskId = status.getTaskId();
-    NodeTask task = schedulerState.getTask(taskId);
-    if (task == null) {
-      LOGGER.warn("Task: {} not found, status: {}", taskId.getValue(), status.getState());
-      schedulerState.removeTask(taskId);
-      return;
-    }
-    LOGGER.info("Status Update for task: {} | state: {}", taskId.getValue(), status.getState());
-    TaskState state = status.getState();
-
-    switch (state) {
-      case TASK_STAGING:
-        schedulerState.makeTaskStaging(taskId);
-        break;
-      case TASK_STARTING:
-        schedulerState.makeTaskStaging(taskId);
-        break;
-      case TASK_RUNNING:
-        schedulerState.makeTaskActive(taskId);
-        break;
-      case TASK_FINISHED:
-        offerLifecycleManager.declineOutstandingOffers(task.getHostname());
-        schedulerState.removeTask(taskId);
-        break;
-      case TASK_FAILED:
-        // Add to pending tasks
-        offerLifecycleManager.declineOutstandingOffers(task.getHostname());
-        schedulerState.makeTaskPending(taskId);
-        break;
-      case TASK_KILLED:
-        offerLifecycleManager.declineOutstandingOffers(task.getHostname());
-        schedulerState.removeTask(taskId);
-        break;
-      case TASK_LOST:
-        offerLifecycleManager.declineOutstandingOffers(task.getHostname());
-        schedulerState.makeTaskPending(taskId);
-        break;
-      default:
-        LOGGER.error("Invalid state: {}", state);
-        break;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/ConsumedOffer.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/ConsumedOffer.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/ConsumedOffer.java
deleted file mode 100644
index a1e8c33..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/ConsumedOffer.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler.fgs;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.mesos.Protos;
-
-/**
- * Represents offers from a slave that have been consumed by Myriad.
- */
-public class ConsumedOffer {
-  private List<Protos.Offer> offers;
-
-  public ConsumedOffer() {
-    this.offers = new LinkedList<>();
-  }
-
-  public void add(Protos.Offer offer) {
-    offers.add(offer);
-  }
-
-  public List<Protos.Offer> getOffers() {
-    return offers;
-  }
-
-  public Collection<Protos.OfferID> getOfferIds() {
-    Collection<Protos.OfferID> ids = new ArrayList<>(offers.size());
-
-    for (Protos.Offer offer : offers) {
-      ids.add(offer.getId());
-    }
-
-    return ids;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java
deleted file mode 100644
index 87a727f..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler.fgs;
-
-import com.ebay.myriad.scheduler.MyriadDriver;
-import com.ebay.myriad.scheduler.SchedulerUtils;
-import com.ebay.myriad.scheduler.yarn.interceptor.BaseInterceptor;
-import com.ebay.myriad.scheduler.yarn.interceptor.InterceptorRegistry;
-import com.ebay.myriad.state.SchedulerState;
-import com.google.common.annotations.VisibleForTesting;
-import java.util.ArrayList;
-import java.util.List;
-import javax.inject.Inject;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.mesos.Protos;
-import org.apache.mesos.Protos.Offer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Handles node manager heartbeat.
- */
-public class NMHeartBeatHandler extends BaseInterceptor {
-  @VisibleForTesting
-  Logger logger = LoggerFactory.getLogger(NMHeartBeatHandler.class);
-
-  private final AbstractYarnScheduler yarnScheduler;
-  private final MyriadDriver myriadDriver;
-  private final YarnNodeCapacityManager yarnNodeCapacityMgr;
-  private final OfferLifecycleManager offerLifecycleMgr;
-  private final NodeStore nodeStore;
-  private final SchedulerState state;
-
-  @Inject
-  public NMHeartBeatHandler(InterceptorRegistry registry, AbstractYarnScheduler yarnScheduler, MyriadDriver myriadDriver, YarnNodeCapacityManager yarnNodeCapacityMgr, OfferLifecycleManager offerLifecycleMgr, NodeStore nodeStore, SchedulerState
-      state) {
-
-    if (registry != null) {
-      registry.register(this);
-    }
-
-    this.yarnScheduler = yarnScheduler;
-    this.myriadDriver = myriadDriver;
-    this.yarnNodeCapacityMgr = yarnNodeCapacityMgr;
-    this.offerLifecycleMgr = offerLifecycleMgr;
-    this.nodeStore = nodeStore;
-    this.state = state;
-  }
-
-  @Override
-  public CallBackFilter getCallBackFilter() {
-    return new CallBackFilter() {
-      @Override
-      public boolean allowCallBacksForNode(NodeId nodeManager) {
-        return SchedulerUtils.isEligibleForFineGrainedScaling(nodeManager.getHost(), state);
-      }
-    };
-  }
-
-  @Override
-  public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context) {
-    switch (event.getType()) {
-      case STARTED: {
-        RMNode rmNode = context.getRMNodes().get(event.getNodeId());
-        Resource totalCapability = rmNode.getTotalCapability();
-        if (totalCapability.getMemory() != 0 || totalCapability.getVirtualCores() != 0) {
-          logger.warn("FineGrainedScaling feature got invoked for a " + "NM with non-zero capacity. Host: {}, Mem: {}, CPU: {}. Setting the NM's capacity to (0G,0CPU)", rmNode.getHostName(), totalCapability.getMemory(), totalCapability
-              .getVirtualCores());
-          totalCapability.setMemory(0);
-          totalCapability.setVirtualCores(0);
-        }
-      }
-        break;
-
-      case STATUS_UPDATE: {
-        handleStatusUpdate(event, context);
-      }
-        break;
-
-      default:
-        break;
-    }
-  }
-
-  @VisibleForTesting
-  protected void handleStatusUpdate(RMNodeEvent event, RMContext context) {
-    if (!(event instanceof RMNodeStatusEvent)) {
-      logger.error("{} not an instance of {}", event.getClass().getName(), RMNodeStatusEvent.class.getName());
-      return;
-    }
-
-    RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
-    RMNode rmNode = context.getRMNodes().get(event.getNodeId());
-    String hostName = rmNode.getNodeID().getHost();
-
-    Node host = nodeStore.getNode(hostName);
-    if (host != null) {
-      host.snapshotRunningContainers();
-    }
-
-    // New capacity of the node =
-    // resources under use on the node (due to previous offers) +
-    // new resources offered by mesos for the node
-    yarnNodeCapacityMgr.setNodeCapacity(rmNode, Resources.add(getResourcesUnderUse(statusEvent), getNewResourcesOfferedByMesos(hostName)));
-  }
-
-  private Resource getNewResourcesOfferedByMesos(String hostname) {
-    OfferFeed feed = offerLifecycleMgr.getOfferFeed(hostname);
-    if (feed == null) {
-      logger.debug("No offer feed for: {}", hostname);
-      return Resource.newInstance(0, 0);
-    }
-    List<Offer> offers = new ArrayList<>();
-    Protos.Offer offer;
-    while ((offer = feed.poll()) != null) {
-      offers.add(offer);
-      offerLifecycleMgr.markAsConsumed(offer);
-    }
-    Resource fromMesosOffers = OfferUtils.getYarnResourcesFromMesosOffers(offers);
-
-    if (logger.isDebugEnabled()) {
-      logger.debug("NM on host {} got {} CPUs and {} memory from mesos", hostname, fromMesosOffers.getVirtualCores(), fromMesosOffers.getMemory());
-    }
-
-    return fromMesosOffers;
-  }
-
-  private Resource getResourcesUnderUse(RMNodeStatusEvent statusEvent) {
-    Resource usedResources = Resource.newInstance(0, 0);
-    for (ContainerStatus status : statusEvent.getContainers()) {
-      if (status.getState() == ContainerState.NEW || status.getState() == ContainerState.RUNNING) {
-        RMContainer rmContainer = yarnScheduler.getRMContainer(status.getContainerId());
-        // (sdaingade) This check is needed as RMContainer information may not be populated
-        // immediately after a RM restart.
-        if (rmContainer != null) {
-          Resources.addTo(usedResources, rmContainer.getAllocatedResource());
-        }
-      }
-    }
-    return usedResources;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/Node.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/Node.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/Node.java
deleted file mode 100644
index 5d9e1e1..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/Node.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler.fgs;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.mesos.Protos;
-
-/**
- * Abstraction that encapsulates YARN and Mesos view of a node.
- */
-public class Node {
-  /**
-   * Mesos slave id associated with this node.
-   */
-  private Protos.SlaveID slaveId;
-
-  /**
-   * Mesos executor on this node.
-   */
-  private Protos.ExecutorInfo execInfo;
-
-  /**
-   * YARN scheduler's representation of this node.
-   */
-  private SchedulerNode node;
-
-  /**
-   * Snapshot of containers allocated by YARN scheduler.
-   * This need not reflect the current state. It is meant to be used by the
-   * Myriad scheduler.
-   */
-  private Set<RMContainer> containerSnapshot;
-
-  public Node(SchedulerNode node) {
-    this.node = node;
-  }
-
-  public SchedulerNode getNode() {
-    return node;
-  }
-
-  public Protos.SlaveID getSlaveId() {
-    return slaveId;
-  }
-
-  public void setSlaveId(Protos.SlaveID slaveId) {
-    this.slaveId = slaveId;
-  }
-
-  public Protos.ExecutorInfo getExecInfo() {
-    return execInfo;
-  }
-
-  public void setExecInfo(Protos.ExecutorInfo execInfo) {
-    this.execInfo = execInfo;
-  }
-
-  public void snapshotRunningContainers() {
-    this.containerSnapshot = new HashSet<>(node.getRunningContainers());
-  }
-
-  public void removeContainerSnapshot() {
-    this.containerSnapshot = null;
-  }
-
-  public Set<RMContainer> getContainerSnapshot() {
-    return this.containerSnapshot;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NodeStore.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NodeStore.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NodeStore.java
deleted file mode 100644
index fd97e04..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NodeStore.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler.fgs;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-
-/**
- * A store for all Node instances managed by this Myriad instance.
- */
-public class NodeStore {
-  private ConcurrentHashMap<String, Node> nodeMap;
-
-  public NodeStore() {
-    nodeMap = new ConcurrentHashMap<>(200, 0.75f, 50);
-  }
-
-  private String getKey(SchedulerNode schedNode) {
-    return schedNode.getNodeID().getHost();
-  }
-
-  public void add(SchedulerNode schedNode) {
-    nodeMap.put(getKey(schedNode), new Node(schedNode));
-  }
-
-  public void remove(String hostname) {
-    nodeMap.remove(hostname);
-  }
-
-  public Node getNode(String hostname) {
-    return nodeMap.get(hostname);
-  }
-
-  public boolean isPresent(String hostname) {
-    return nodeMap.containsKey(hostname);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/OfferFeed.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/OfferFeed.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/OfferFeed.java
deleted file mode 100644
index 487c571..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/OfferFeed.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler.fgs;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.mesos.Protos;
-
-/**
- * Feed of Mesos offers for a node.
- */
-public class OfferFeed {
-  private ConcurrentLinkedQueue<Protos.Offer> queue;
-
-  public OfferFeed() {
-    this.queue = new ConcurrentLinkedQueue<>();
-  }
-
-  public void add(Protos.Offer offer) {
-    queue.add(offer);
-  }
-
-  /**
-   * Retrieves and removes the head of the feed, or returns NULL if the feed is
-   * empty.
-   */
-  public Protos.Offer poll() {
-    return queue.poll();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/OfferLifecycleManager.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/OfferLifecycleManager.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/OfferLifecycleManager.java
deleted file mode 100644
index 387f532..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/OfferLifecycleManager.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler.fgs;
-
-import com.ebay.myriad.scheduler.MyriadDriver;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.inject.Inject;
-
-import org.apache.mesos.Protos;
-import org.apache.mesos.Protos.Offer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Manages the Mesos offers tracked by Myriad.
- */
-public class OfferLifecycleManager {
-  private static final Logger LOGGER = LoggerFactory.getLogger(OfferLifecycleManager.class);
-
-  private Map<String, OfferFeed> offerFeedMap;
-
-  /**
-   * !!! Not thread safe !!!
-   */
-  private final Map<String, ConsumedOffer> consumedOfferMap;
-
-  private final NodeStore nodeStore;
-  private final MyriadDriver myriadDriver;
-
-  @Inject
-  public OfferLifecycleManager(NodeStore nodeStore, MyriadDriver myriadDriver) {
-
-    this.offerFeedMap = new ConcurrentHashMap<>(200, 0.75f, 50);
-    this.consumedOfferMap = new HashMap<>(200, 0.75f);
-    this.nodeStore = nodeStore;
-    this.myriadDriver = myriadDriver;
-  }
-
-  public OfferFeed getOfferFeed(String hostname) {
-    return offerFeedMap.get(hostname);
-  }
-
-  public void declineOffer(Protos.Offer offer) {
-    myriadDriver.getDriver().declineOffer(offer.getId());
-    LOGGER.debug("Declined offer {}", offer.getId());
-  }
-
-  public void addOffers(Protos.Offer... offers) {
-    for (Protos.Offer offer : offers) {
-      String hostname = offer.getHostname();
-      Node node = nodeStore.getNode(hostname);
-      if (node != null) {
-        OfferFeed feed = offerFeedMap.get(hostname);
-        if (feed == null) {
-          feed = new OfferFeed();
-          offerFeedMap.put(hostname, feed);
-        }
-        feed.add(offer);
-
-        node.setSlaveId(offer.getSlaveId());
-
-        LOGGER.debug("addResourceOffers: caching offer for host {}, offer id {}", hostname, offer.getId().getValue());
-      } else {
-        myriadDriver.getDriver().declineOffer(offer.getId());
-        LOGGER.debug("Declined offer for unregistered host {}", hostname);
-      }
-    }
-  }
-
-  public void markAsConsumed(Protos.Offer offer) {
-    ConsumedOffer consumedOffer = consumedOfferMap.get(offer.getHostname());
-    if (consumedOffer == null) {
-      consumedOffer = new ConsumedOffer();
-      consumedOfferMap.put(offer.getHostname(), consumedOffer);
-    }
-
-    consumedOffer.add(offer);
-  }
-
-  public ConsumedOffer drainConsumedOffer(String hostname) {
-    return consumedOfferMap.remove(hostname);
-  }
-
-  public void declineOutstandingOffers(String hostname) {
-    int numOutStandingOffers = 0;
-    OfferFeed offerFeed = getOfferFeed(hostname);
-    Offer offer;
-    while (offerFeed != null && (offer = offerFeed.poll()) != null) {
-      declineOffer(offer);
-      numOutStandingOffers++;
-    }
-    if (numOutStandingOffers > 0) {
-      LOGGER.info("Declined {} outstanding offers for host {}", numOutStandingOffers, hostname);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/OfferUtils.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/OfferUtils.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/OfferUtils.java
deleted file mode 100644
index ff36c06..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/OfferUtils.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler.fgs;
-
-import java.util.Collection;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.mesos.Protos;
-import org.apache.mesos.Protos.Offer;
-
-/**
- * Utility class that provides useful methods that deal with Mesos offers.
- */
-public class OfferUtils {
-
-  /**
-   * Transforms a collection of mesos offers into {@link Resource}.
-   *
-   * @param offers collection of mesos offers
-   * @return a single resource object equivalent to the cumulative sum of mesos offers
-   */
-  public static Resource getYarnResourcesFromMesosOffers(Collection<Offer> offers) {
-    double cpus = 0.0;
-    double mem = 0.0;
-
-    for (Protos.Offer offer : offers) {
-      for (Protos.Resource resource : offer.getResourcesList()) {
-        if (resource.getName().equalsIgnoreCase("cpus")) {
-          cpus += resource.getScalar().getValue();
-        } else if (resource.getName().equalsIgnoreCase("mem")) {
-          mem += resource.getScalar().getValue();
-        }
-      }
-    }
-    return Resource.newInstance((int) mem, (int) cpus);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
deleted file mode 100644
index 936f381..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler.fgs;
-
-import com.ebay.myriad.configuration.NodeManagerConfiguration;
-import com.ebay.myriad.executor.ContainerTaskStatusRequest;
-import com.ebay.myriad.scheduler.MyriadDriver;
-import com.ebay.myriad.scheduler.SchedulerUtils;
-import com.ebay.myriad.scheduler.yarn.interceptor.BaseInterceptor;
-import com.ebay.myriad.scheduler.yarn.interceptor.InterceptorRegistry;
-import com.ebay.myriad.state.SchedulerState;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import javax.inject.Inject;
-
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceOption;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.mesos.Protos;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Manages the capacity exposed by NodeManager. It uses the offers available
- * from Mesos to inflate the node capacity and lets ResourceManager make the
- * scheduling decision. After the scheduling decision is done, there are 2 cases:
- * <p/>
- * 1. If ResourceManager did not use the expanded capacity, then the node's
- * capacity is reverted back to original value and the offer is declined.
- * 2. If ResourceManager ended up using the expanded capacity, then the node's
- * capacity is updated accordingly and any unused capacity is returned back to
- * Mesos.
- */
-public class YarnNodeCapacityManager extends BaseInterceptor {
-  private static final Logger LOGGER = LoggerFactory.getLogger(YarnNodeCapacityManager.class);
-
-  private final AbstractYarnScheduler yarnScheduler;
-  private final RMContext rmContext;
-  private final MyriadDriver myriadDriver;
-  private final OfferLifecycleManager offerLifecycleMgr;
-  private final NodeStore nodeStore;
-  private final SchedulerState state;
-
-  @Inject
-  public YarnNodeCapacityManager(InterceptorRegistry registry, AbstractYarnScheduler yarnScheduler, RMContext rmContext, MyriadDriver myriadDriver, OfferLifecycleManager offerLifecycleMgr, NodeStore nodeStore, SchedulerState state) {
-    if (registry != null) {
-      registry.register(this);
-    }
-    this.yarnScheduler = yarnScheduler;
-    this.rmContext = rmContext;
-    this.myriadDriver = myriadDriver;
-    this.offerLifecycleMgr = offerLifecycleMgr;
-    this.nodeStore = nodeStore;
-    this.state = state;
-  }
-
-  @Override
-  public CallBackFilter getCallBackFilter() {
-    return new CallBackFilter() {
-      @Override
-      public boolean allowCallBacksForNode(NodeId nodeManager) {
-        return SchedulerUtils.isEligibleForFineGrainedScaling(nodeManager.getHost(), state);
-      }
-    };
-  }
-
-  @Override
-  public void afterSchedulerEventHandled(SchedulerEvent event) {
-    switch (event.getType()) {
-      case NODE_ADDED: {
-        if (!(event instanceof NodeAddedSchedulerEvent)) {
-          LOGGER.error("{} not an instance of {}", event.getClass().getName(), NodeAddedSchedulerEvent.class.getName());
-          return;
-        }
-
-        NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event;
-        NodeId nodeId = nodeAddedEvent.getAddedRMNode().getNodeID();
-        String host = nodeId.getHost();
-
-        SchedulerNode node = yarnScheduler.getSchedulerNode(nodeId);
-        nodeStore.add(node);
-        LOGGER.info("afterSchedulerEventHandled: NM registration from node {}", host);
-      }
-        break;
-
-      case NODE_UPDATE: {
-        if (!(event instanceof NodeUpdateSchedulerEvent)) {
-          LOGGER.error("{} not an instance of {}", event.getClass().getName(), NodeUpdateSchedulerEvent.class.getName());
-          return;
-        }
-
-        RMNode rmNode = ((NodeUpdateSchedulerEvent) event).getRMNode();
-        handleContainerAllocation(rmNode);
-      }
-        break;
-
-      default:
-        break;
-    }
-  }
-
-  /**
-   * Checks if any containers were allocated in the current scheduler run and
-   * launches the corresponding Mesos tasks. It also udpates the node
-   * capacity depending on what portion of the consumed offers were actually
-   * used.
-   */
-  @VisibleForTesting
-  protected void handleContainerAllocation(RMNode rmNode) {
-    String host = rmNode.getNodeID().getHost();
-
-    ConsumedOffer consumedOffer = offerLifecycleMgr.drainConsumedOffer(host);
-    if (consumedOffer == null) {
-      LOGGER.debug("No offer consumed for {}", host);
-      return;
-    }
-
-    Node node = nodeStore.getNode(host);
-    Set<RMContainer> containersBeforeSched = node.getContainerSnapshot();
-    Set<RMContainer> containersAfterSched = new HashSet<>(node.getNode().getRunningContainers());
-
-    Set<RMContainer> containersAllocatedByMesosOffer = (containersBeforeSched == null) ? containersAfterSched : Sets.difference(containersAfterSched, containersBeforeSched);
-
-    if (containersAllocatedByMesosOffer.isEmpty()) {
-      LOGGER.debug("No containers allocated using Mesos offers for host: {}", host);
-      for (Protos.Offer offer : consumedOffer.getOffers()) {
-        offerLifecycleMgr.declineOffer(offer);
-      }
-      setNodeCapacity(rmNode, Resources.subtract(rmNode.getTotalCapability(), OfferUtils.getYarnResourcesFromMesosOffers(consumedOffer.getOffers())));
-    } else {
-      LOGGER.debug("Containers allocated using Mesos offers for host: {} count: {}", host, containersAllocatedByMesosOffer.size());
-
-      // Identify the Mesos tasks that need to be launched
-      List<Protos.TaskInfo> tasks = Lists.newArrayList();
-      Resource resUsed = Resource.newInstance(0, 0);
-
-      for (RMContainer newContainer : containersAllocatedByMesosOffer) {
-        tasks.add(getTaskInfoForContainer(newContainer, consumedOffer, node));
-        resUsed = Resources.add(resUsed, newContainer.getAllocatedResource());
-      }
-
-      // Reduce node capacity to account for unused offers
-      Resource resOffered = OfferUtils.getYarnResourcesFromMesosOffers(consumedOffer.getOffers());
-      Resource resUnused = Resources.subtract(resOffered, resUsed);
-      setNodeCapacity(rmNode, Resources.subtract(rmNode.getTotalCapability(), resUnused));
-
-      myriadDriver.getDriver().launchTasks(consumedOffer.getOfferIds(), tasks);
-    }
-
-    // No need to hold on to the snapshot anymore
-    node.removeContainerSnapshot();
-  }
-
-  /**
-   * 1. Updates {@link RMNode#getTotalCapability()} with newCapacity.
-   * 2. Sends out a {@link NodeResourceUpdateSchedulerEvent} that's handled by YARN's scheduler.
-   * The scheduler updates the corresponding {@link SchedulerNode} with the newCapacity.
-   *
-   * @param rmNode
-   * @param newCapacity
-   */
-  @SuppressWarnings("unchecked")
-  public void setNodeCapacity(RMNode rmNode, Resource newCapacity) {
-    rmNode.getTotalCapability().setMemory(newCapacity.getMemory());
-    rmNode.getTotalCapability().setVirtualCores(newCapacity.getVirtualCores());
-    LOGGER.debug("Setting capacity for node {} to {}", rmNode.getHostName(), newCapacity);
-    // updates the scheduler with the new capacity for the NM.
-    // the event is handled by the scheduler asynchronously
-    rmContext.getDispatcher().getEventHandler().handle(new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption.newInstance(rmNode.getTotalCapability(), RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
-  }
-
-  private Protos.TaskInfo getTaskInfoForContainer(RMContainer rmContainer, ConsumedOffer consumedOffer, Node node) {
-
-    Protos.Offer offer = consumedOffer.getOffers().get(0);
-    Container container = rmContainer.getContainer();
-    Protos.TaskID taskId = Protos.TaskID.newBuilder().setValue(ContainerTaskStatusRequest.YARN_CONTAINER_TASK_ID_PREFIX + container.getId().toString()).build();
-
-    // TODO (sdaingade) Remove ExecutorInfo from the Node object
-    // as this is now cached in the NodeTask object in scheduler state.
-    Protos.ExecutorInfo executorInfo = node.getExecInfo();
-    if (executorInfo == null) {
-      executorInfo = Protos.ExecutorInfo.newBuilder(state.getNodeTask(offer.getSlaveId(), NodeManagerConfiguration.NM_TASK_PREFIX).getExecutorInfo()).setFrameworkId(offer.getFrameworkId()).build();
-      node.setExecInfo(executorInfo);
-    }
-
-    return Protos.TaskInfo.newBuilder().setName("task_" + taskId.getValue()).setTaskId(taskId).setSlaveId(offer.getSlaveId()).addResources(Protos.Resource.newBuilder().setName("cpus").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar
-        .newBuilder().setValue(container.getResource().getVirtualCores()))).addResources(Protos.Resource.newBuilder().setName("mem").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(container.getResource()
-        .getMemory()))).setExecutor(executorInfo).build();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadCapacityScheduler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadCapacityScheduler.java
deleted file mode 100644
index c480c09..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadCapacityScheduler.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler.yarn;
-
-import com.ebay.myriad.scheduler.yarn.interceptor.CompositeInterceptor;
-import com.ebay.myriad.scheduler.yarn.interceptor.YarnSchedulerInterceptor;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
-
-/**
- * {@link MyriadCapacityScheduler} just extends YARN's {@link CapacityScheduler} and
- * allows some of the {@link CapacityScheduler} methods to be intercepted
- * via the {@link YarnSchedulerInterceptor} interface.
- */
-public class MyriadCapacityScheduler extends CapacityScheduler {
-  private Configuration conf;
-
-  private RMContext rmContext;
-  private YarnSchedulerInterceptor yarnSchedulerInterceptor;
-  private RMNodeEventHandler rmNodeEventHandler;
-
-  public MyriadCapacityScheduler() {
-    super();
-  }
-
-  /**
-   * Register an event handler that receives {@link RMNodeEvent} events.
-   * This event handler is registered ahead of RM's own event handler for RMNodeEvents.
-   * For e.g. myriad can inspect a node's HB (RMNodeStatusEvent) before the HB is handled by
-   * RM and the scheduler.
-   *
-   * @param rmContext
-   */
-  @Override
-  public synchronized void setRMContext(RMContext rmContext) {
-    this.rmContext = rmContext;
-    this.yarnSchedulerInterceptor = new CompositeInterceptor();
-    rmNodeEventHandler = new RMNodeEventHandler(yarnSchedulerInterceptor, rmContext);
-    rmContext.getDispatcher().register(RMNodeEventType.class, rmNodeEventHandler);
-    super.setRMContext(rmContext);
-  }
-
-  /**
-   * ******** Methods overridden from YARN {@link CapacityScheduler}  *********************
-   */
-
-  @Override
-  public synchronized void serviceInit(Configuration conf) throws Exception {
-    this.conf = conf;
-    super.serviceInit(conf);
-  }
-
-  @Override
-  public synchronized void serviceStart() throws Exception {
-    this.yarnSchedulerInterceptor.init(conf, this, rmContext);
-    super.serviceStart();
-  }
-
-  @Override
-  public synchronized void handle(SchedulerEvent event) {
-    this.yarnSchedulerInterceptor.beforeSchedulerEventHandled(event);
-    super.handle(event);
-    this.yarnSchedulerInterceptor.afterSchedulerEventHandled(event);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadFairScheduler.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadFairScheduler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadFairScheduler.java
deleted file mode 100644
index 5991be2..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadFairScheduler.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler.yarn;
-
-import com.ebay.myriad.scheduler.yarn.interceptor.CompositeInterceptor;
-import com.ebay.myriad.scheduler.yarn.interceptor.YarnSchedulerInterceptor;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
-
-/**
- * {@link MyriadFairScheduler} just extends YARN's {@link FairScheduler} and
- * allows some of the {@link FairScheduler} methods to be intercepted
- * via the {@link YarnSchedulerInterceptor} interface.
- */
-public class MyriadFairScheduler extends FairScheduler {
-
-  private RMContext rmContext;
-  private YarnSchedulerInterceptor yarnSchedulerInterceptor;
-  private RMNodeEventHandler rmNodeEventHandler;
-  private Configuration conf;
-
-  public MyriadFairScheduler() {
-    super();
-  }
-
-  /**
-   * Register an event handler that receives {@link RMNodeEvent} events.
-   * This event handler is registered ahead of RM's own event handler for RMNodeEvents.
-   * For e.g. myriad can inspect a node's HB (RMNodeStatusEvent) before the HB is handled by
-   * RM and the scheduler.
-   *
-   * @param rmContext
-   */
-  @Override
-  public synchronized void setRMContext(RMContext rmContext) {
-    this.rmContext = rmContext;
-    this.yarnSchedulerInterceptor = new CompositeInterceptor();
-    rmNodeEventHandler = new RMNodeEventHandler(yarnSchedulerInterceptor, rmContext);
-    rmContext.getDispatcher().register(RMNodeEventType.class, rmNodeEventHandler);
-    super.setRMContext(rmContext);
-  }
-
-  /**
-   * ******** Methods overridden from YARN {@link FairScheduler}  *********************
-   */
-
-  @Override
-  public synchronized void serviceInit(Configuration conf) throws Exception {
-    this.conf = conf;
-    super.serviceInit(conf);
-  }
-
-  @Override
-  public synchronized void serviceStart() throws Exception {
-    this.yarnSchedulerInterceptor.init(conf, this, rmContext);
-    super.serviceStart();
-  }
-
-  @Override
-  public synchronized void handle(SchedulerEvent event) {
-    this.yarnSchedulerInterceptor.beforeSchedulerEventHandled(event);
-    super.handle(event);
-    this.yarnSchedulerInterceptor.afterSchedulerEventHandled(event);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadFifoScheduler.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadFifoScheduler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadFifoScheduler.java
deleted file mode 100644
index 008953e..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadFifoScheduler.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler.yarn;
-
-import com.ebay.myriad.scheduler.yarn.interceptor.CompositeInterceptor;
-import com.ebay.myriad.scheduler.yarn.interceptor.YarnSchedulerInterceptor;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
-
-/**
- * {@link MyriadFifoScheduler} just extends YARN's {@link FifoScheduler} and
- * allows some of the {@link FifoScheduler} methods to be intercepted
- * via the {@link YarnSchedulerInterceptor} interface.
- */
-public class MyriadFifoScheduler extends FifoScheduler {
-  private Configuration conf;
-
-  private RMContext rmContext;
-  private YarnSchedulerInterceptor yarnSchedulerInterceptor;
-  private RMNodeEventHandler rmNodeEventHandler;
-
-  public MyriadFifoScheduler() {
-    super();
-  }
-
-  /**
-   * Register an event handler that receives {@link RMNodeEvent} events.
-   * This event handler is registered ahead of RM's own event handler for RMNodeEvents.
-   * For e.g. myriad can inspect a node's HB (RMNodeStatusEvent) before the HB is handled by
-   * RM and the scheduler.
-   *
-   * @param rmContext
-   */
-  @Override
-  public synchronized void setRMContext(RMContext rmContext) {
-    this.rmContext = rmContext;
-    this.yarnSchedulerInterceptor = new CompositeInterceptor();
-    rmNodeEventHandler = new RMNodeEventHandler(yarnSchedulerInterceptor, rmContext);
-    rmContext.getDispatcher().register(RMNodeEventType.class, rmNodeEventHandler);
-    super.setRMContext(rmContext);
-  }
-
-  /**
-   * ******** Methods overridden from YARN {@link FifoScheduler}  *********************
-   */
-
-  @Override
-  public synchronized void serviceInit(Configuration conf) throws Exception {
-    this.conf = conf;
-    super.serviceInit(conf);
-  }
-
-  @Override
-  public synchronized void serviceStart() throws Exception {
-    this.yarnSchedulerInterceptor.init(conf, this, rmContext);
-    super.serviceStart();
-  }
-
-  @Override
-  public synchronized void handle(SchedulerEvent event) {
-    this.yarnSchedulerInterceptor.beforeSchedulerEventHandled(event);
-    super.handle(event);
-    this.yarnSchedulerInterceptor.afterSchedulerEventHandled(event);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/RMNodeEventHandler.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/RMNodeEventHandler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/RMNodeEventHandler.java
deleted file mode 100644
index c758ea4..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/RMNodeEventHandler.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler.yarn;
-
-import com.ebay.myriad.scheduler.yarn.interceptor.YarnSchedulerInterceptor;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
-
-/**
- * Passes the {@link RMNodeEvent} events into the {@link YarnSchedulerInterceptor}.
- */
-public class RMNodeEventHandler implements EventHandler<RMNodeEvent> {
-  private final YarnSchedulerInterceptor interceptor;
-  private final RMContext rmContext;
-
-  public RMNodeEventHandler(YarnSchedulerInterceptor interceptor, RMContext rmContext) {
-    this.interceptor = interceptor;
-    this.rmContext = rmContext;
-  }
-
-  @Override
-  public void handle(RMNodeEvent event) {
-    interceptor.beforeRMNodeEventHandled(event, rmContext);
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/BaseInterceptor.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/BaseInterceptor.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/BaseInterceptor.java
deleted file mode 100644
index b3b37e5..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/BaseInterceptor.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler.yarn.interceptor;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
-
-import java.io.IOException;
-
-/**
- * A no-op interceptor whose sole purpose is to serve as a base class
- * for other interceptors. Child interceptors can selectively override the
- * required methods.
- */
-public class BaseInterceptor implements YarnSchedulerInterceptor {
-  // restrict the constructor
-  protected BaseInterceptor() {
-  }
-
-  @Override
-  public CallBackFilter getCallBackFilter() {
-    return new CallBackFilter() {
-      @Override
-      public boolean allowCallBacksForNode(NodeId nodeManager) {
-        return true;
-      }
-    };
-  }
-
-  @Override
-  public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, RMContext rmContext) throws IOException {
-  }
-
-  @Override
-  public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context) {
-
-  }
-
-  @Override
-  public void beforeSchedulerEventHandled(SchedulerEvent event) {
-
-  }
-
-  @Override
-  public void afterSchedulerEventHandled(SchedulerEvent event) {
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java
deleted file mode 100644
index 6ae8b7e..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler.yarn.interceptor;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * An interceptor that wraps other interceptors. The Myriad{Fair,Capacity,Fifo}Scheduler classes
- * instantiate this class and allow interception of the Yarn scheduler events/method calls.
- * <p/>
- * The {@link CompositeInterceptor} allows other interceptors to be registered via {@link InterceptorRegistry}
- * and passes control to the registered interceptors whenever a event/method call is being intercepted.
- */
-public class CompositeInterceptor implements YarnSchedulerInterceptor, InterceptorRegistry {
-  private static final Logger LOGGER = LoggerFactory.getLogger(CompositeInterceptor.class);
-
-  private Map<Class<?>, YarnSchedulerInterceptor> interceptors = Maps.newLinkedHashMap();
-  private YarnSchedulerInterceptor myriadInitInterceptor;
-
-  /**
-   * Called by Myriad{Fair,Capacity,Fifo}Scheduler classes. Creates an instance of
-   * {@link MyriadInitializationInterceptor}.
-   */
-  public CompositeInterceptor() {
-    this.myriadInitInterceptor = new MyriadInitializationInterceptor(this);
-  }
-
-  @VisibleForTesting
-  public void setMyriadInitInterceptor(YarnSchedulerInterceptor myriadInitInterceptor) {
-    this.myriadInitInterceptor = myriadInitInterceptor;
-  }
-
-  @Override
-  public void register(YarnSchedulerInterceptor interceptor) {
-    interceptors.put(interceptor.getClass(), interceptor);
-    LOGGER.info("Registered {} into the registry.", interceptor.getClass().getName());
-  }
-
-  @Override
-  public CallBackFilter getCallBackFilter() {
-    return new CallBackFilter() {
-      @Override
-      public boolean allowCallBacksForNode(NodeId nodeManager) {
-        return true;
-      }
-    };
-  }
-
-  /**
-   * Allows myriad to be initialized via {@link #myriadInitInterceptor}. After myriad is initialized,
-   * other interceptors will later register with this class via
-   * {@link InterceptorRegistry#register(YarnSchedulerInterceptor)}.
-   *
-   * @param conf
-   * @param yarnScheduler
-   * @param rmContext
-   * @throws IOException
-   */
-  @Override
-  public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, RMContext rmContext) throws IOException {
-    myriadInitInterceptor.init(conf, yarnScheduler, rmContext);
-  }
-
-  @Override
-  public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context) {
-    for (YarnSchedulerInterceptor interceptor : interceptors.values()) {
-      if (interceptor.getCallBackFilter().allowCallBacksForNode(event.getNodeId())) {
-        interceptor.beforeRMNodeEventHandled(event, context);
-      }
-    }
-  }
-
-  @Override
-  public void beforeSchedulerEventHandled(SchedulerEvent event) {
-    for (YarnSchedulerInterceptor interceptor : interceptors.values()) {
-      final NodeId nodeId = getNodeIdForSchedulerEvent(event);
-      if (nodeId != null && interceptor.getCallBackFilter().allowCallBacksForNode(nodeId)) {
-        interceptor.beforeSchedulerEventHandled(event);
-      }
-    }
-  }
-
-  @Override
-  public void afterSchedulerEventHandled(SchedulerEvent event) {
-    for (YarnSchedulerInterceptor interceptor : interceptors.values()) {
-      NodeId nodeId = getNodeIdForSchedulerEvent(event);
-      if (nodeId != null && interceptor.getCallBackFilter().allowCallBacksForNode(nodeId)) {
-        interceptor.afterSchedulerEventHandled(event);
-      }
-    }
-  }
-
-  private NodeId getNodeIdForSchedulerEvent(SchedulerEvent event) {
-    switch (event.getType()) {
-      case NODE_ADDED:
-        return ((NodeAddedSchedulerEvent) event).getAddedRMNode().getNodeID();
-      case NODE_REMOVED:
-        return ((NodeRemovedSchedulerEvent) event).getRemovedRMNode().getNodeID();
-      case NODE_UPDATE:
-        return ((NodeUpdateSchedulerEvent) event).getRMNode().getNodeID();
-      case NODE_RESOURCE_UPDATE:
-        return ((NodeResourceUpdateSchedulerEvent) event).getRMNode().getNodeID();
-    }
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/InterceptorRegistry.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/InterceptorRegistry.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/InterceptorRegistry.java
deleted file mode 100644
index 3a504b1..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/InterceptorRegistry.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler.yarn.interceptor;
-
-/**
- * Allows registration of {@link YarnSchedulerInterceptor}.
- */
-public interface InterceptorRegistry {
-
-  public void register(YarnSchedulerInterceptor interceptor);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java
deleted file mode 100644
index 71580b3..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler.yarn.interceptor;
-
-import com.ebay.myriad.Main;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * Responsible for intializing myriad.
- */
-public class MyriadInitializationInterceptor extends BaseInterceptor {
-  private static final Logger LOGGER = LoggerFactory.getLogger(MyriadInitializationInterceptor.class);
-
-  private final InterceptorRegistry registry;
-
-  public MyriadInitializationInterceptor(InterceptorRegistry registry) {
-    this.registry = registry;
-  }
-
-  /**
-   * Initialize Myriad plugin before RM's scheduler is initialized.
-   * This includes registration with Mesos master, initialization of
-   * the myriad web application, initializing guice modules etc.
-   */
-  @Override
-  public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, RMContext rmContext) throws IOException {
-    try {
-      Main.initialize(conf, yarnScheduler, rmContext, registry);
-    } catch (Exception e) {
-      // Abort bringing up RM
-      throw new RuntimeException("Failed to initialize myriad", e);
-    }
-    LOGGER.info("Initialized myriad.");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java
deleted file mode 100644
index e28d052..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler.yarn.interceptor;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
-
-import java.io.IOException;
-
-/**
- * Allows interception of YARN's scheduler events (or methods).
- */
-public interface YarnSchedulerInterceptor {
-
-  /**
-   * Filters the method callbacks.
-   */
-  interface CallBackFilter {
-    /**
-     * Method to determine if any other methods in {@link YarnSchedulerInterceptor}
-     * pertaining to a given node manager should be invoked or not.
-     *
-     * @param nodeManager NodeId of the Node Manager registered with RM.
-     * @return true to allow invoking further interceptor methods. false otherwise.
-     */
-    public boolean allowCallBacksForNode(NodeId nodeManager);
-  }
-
-  /**
-   * Return an instance of {@link CallBackFilter}. {@link CallBackFilter#allowCallBacksForNode(NodeId)}
-   * method is invoked to *determine* if any of the other methods pertaining to a specific node
-   * needs to be invoked or not.
-   *
-   * @return
-   */
-  public CallBackFilter getCallBackFilter();
-
-  /**
-   * Invoked *before* {@link AbstractYarnScheduler#reinitialize(Configuration, RMContext)}
-   *
-   * @param conf
-   * @param yarnScheduler
-   * @param rmContext
-   * @throws IOException
-   */
-  public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, RMContext rmContext) throws IOException;
-
-  /**
-   * Invoked *before* {@link RMNodeImpl#handle(RMNodeEvent)} only if
-   * {@link CallBackFilter#allowCallBacksForNode(NodeId)} returns true.
-   *
-   * @param event
-   * @param context
-   */
-  public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context);
-
-  /**
-   * Invoked *before* {@link YarnScheduler#handle(org.apache.hadoop.yarn.event.Event)} only if
-   * {@link CallBackFilter#allowCallBacksForNode(NodeId)} returns true.
-   *
-   * @param event
-   */
-  public void beforeSchedulerEventHandled(SchedulerEvent event);
-
-  /**
-   * Invoked *after* {@link YarnScheduler#handle(org.apache.hadoop.yarn.event.Event)} only if
-   * {@link CallBackFilter#allowCallBacksForNode(NodeId)} returns true.
-   *
-   * @param event
-   */
-  public void afterSchedulerEventHandled(SchedulerEvent event);
-
-}


Mime
View raw message