myriad-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From smare...@apache.org
Subject [08/10] incubator-myriad git commit: Cleaned up flexdown code
Date Thu, 15 Oct 2015 21:00:20 GMT
Cleaned up flexdown code

- Fixed a bunch of edge cases esp. when constraint is not specified in the flex down request
- Refactored a code to make it more readable; flexdown now happens in 3 (readable) steps:
   - flexdown pending tasks matching the given profile, constraints
   - flexdown staging tasks matching the given profile, constraints
   - flexdown active tasks matching the given profile, constraints
 - Minor cleanups of other classes wrt logging and removal of unused methods.


Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/34e3958c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/34e3958c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/34e3958c

Branch: refs/heads/master
Commit: 34e3958c4134b079fbc766f4fe4d2f88b42840f4
Parents: e8ec517
Author: Santosh Marella <smarella@maprtech.com>
Authored: Tue Oct 13 13:17:05 2015 -0700
Committer: Santosh Marella <marella@gmail.com>
Committed: Thu Oct 15 12:56:46 2015 -0700

----------------------------------------------------------------------
 .../com/ebay/myriad/api/ClustersResource.java   |  16 ++-
 .../myriad/policy/LeastAMNodesFirstPolicy.java  |  47 ++++---
 .../ebay/myriad/policy/NodeScaleDownPolicy.java |  10 +-
 .../ebay/myriad/scheduler/MyriadOperations.java | 132 ++++++++-----------
 .../ebay/myriad/scheduler/SchedulerUtils.java   |  24 ----
 .../scheduler/constraints/LikeConstraint.java   |   4 +-
 .../handlers/ResourceOffersEventHandler.java    |  11 +-
 .../com/ebay/myriad/state/SchedulerState.java   |  53 +++++---
 .../constraints/LikeConstraintSpec.groovy       |  12 ++
 9 files changed, 148 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/34e3958c/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java b/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java
index f2a3018..9f47b51 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java
@@ -19,6 +19,7 @@ import com.codahale.metrics.annotation.Timed;
 import com.ebay.myriad.api.model.FlexDownClusterRequest;
 import com.ebay.myriad.api.model.FlexUpClusterRequest;
 import com.ebay.myriad.scheduler.MyriadOperations;
+import com.ebay.myriad.scheduler.NMProfile;
 import com.ebay.myriad.scheduler.NMProfileManager;
 import com.ebay.myriad.scheduler.constraints.ConstraintFactory;
 import com.ebay.myriad.state.SchedulerState;
@@ -108,11 +109,11 @@ public class ClustersResource {
         isValidRequest = isValidRequest && validateInstances(instances, response);
         isValidRequest = isValidRequest && validateConstraints(constraints, response);
 
-        Integer numFlexedUp = this.getNumFlexedupNMs();
+        Integer numFlexedUp = this.getNumFlexedupNMs(profile);
         if (isValidRequest && numFlexedUp < instances)  {
             String message = String.format("Number of requested instances for flexdown is
greater than the number of " +
-                "Node Managers previously flexed up. Requested: %d, Previously flexed Up:
%d. " +
-                "Only %d Node Managers will be flexed down", instances, numFlexedUp, numFlexedUp);
+                "Node Managers previously flexed up for profile '%s'. Requested: %d, Previously
flexed Up: %d. " +
+                "Only %d Node Managers will be flexed down.", profile, instances, numFlexedUp,
numFlexedUp);
             response.entity(message);
             LOGGER.warn(message);
         }
@@ -203,10 +204,11 @@ public class ClustersResource {
     }
 
 
-    private Integer getNumFlexedupNMs() {
-        return this.schedulerState.getActiveTaskIds().size()
-                + this.schedulerState.getStagingTaskIds().size()
-                + this.schedulerState.getPendingTaskIds().size();
+    private Integer getNumFlexedupNMs(String profile) {
+      NMProfile nmProfile = profileManager.get(profile);
+      return this.schedulerState.getActiveTaskIDsForProfile(nmProfile).size()
+                + this.schedulerState.getStagingTaskIDsForProfile(nmProfile).size()
+                + this.schedulerState.getPendingTaskIDsForProfile(nmProfile).size();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/34e3958c/myriad-scheduler/src/main/java/com/ebay/myriad/policy/LeastAMNodesFirstPolicy.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/policy/LeastAMNodesFirstPolicy.java
b/myriad-scheduler/src/main/java/com/ebay/myriad/policy/LeastAMNodesFirstPolicy.java
index 38b14a7..568247b 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/policy/LeastAMNodesFirstPolicy.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/policy/LeastAMNodesFirstPolicy.java
@@ -2,7 +2,7 @@ package com.ebay.myriad.policy;
 
 import com.ebay.myriad.scheduler.yarn.interceptor.BaseInterceptor;
 import com.ebay.myriad.scheduler.yarn.interceptor.InterceptorRegistry;
-import com.google.common.collect.Lists;
+import com.ebay.myriad.state.SchedulerState;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
@@ -10,11 +10,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.mesos.Protos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -28,6 +28,7 @@ public class LeastAMNodesFirstPolicy extends BaseInterceptor implements
NodeScal
     private static final Logger LOGGER = LoggerFactory.getLogger(LeastAMNodesFirstPolicy.class);
 
     private final AbstractYarnScheduler yarnScheduler;
+    private final SchedulerState schedulerState;
 
     //TODO(Santosh): Should figure out the right values for the hashmap properties.
     // currently it's tuned for 200 nodes and 50 RM RPC threads (Yarn's default).
@@ -35,20 +36,26 @@ public class LeastAMNodesFirstPolicy extends BaseInterceptor implements
NodeScal
     private static final int EXPECTED_CONCURRENT_ACCCESS_COUNT = 50;
     private static final float LOAD_FACTOR_DEFAULT = 0.75f;
 
-    private Map<String, SchedulerNode> schedulerNodes = new ConcurrentHashMap<>(INITIAL_NODE_SIZE,
LOAD_FACTOR_DEFAULT, EXPECTED_CONCURRENT_ACCCESS_COUNT);
+    private Map<String, SchedulerNode> schedulerNodes =
+        new ConcurrentHashMap<>(INITIAL_NODE_SIZE, LOAD_FACTOR_DEFAULT, EXPECTED_CONCURRENT_ACCCESS_COUNT);
 
     @Inject
-    public LeastAMNodesFirstPolicy(InterceptorRegistry registry, AbstractYarnScheduler yarnScheduler)
{
+    public LeastAMNodesFirstPolicy(InterceptorRegistry registry,
+                                   AbstractYarnScheduler yarnScheduler,
+                                   SchedulerState schedulerState) {
         registry.register(this);
         this.yarnScheduler = yarnScheduler;
+        this.schedulerState = schedulerState;
     }
 
+    /**
+     *  Sort the given list of tasks by the number of App Master containers running on the
corresponding NM node.
+     * @param taskIDs
+     */
     @Override
-    public List<String> getNodesToScaleDown() {
-        List<SchedulerNode> nodes = Lists.newArrayList(this.schedulerNodes.values());
-
+    public void apply(List<Protos.TaskID> taskIDs) {
         if (LOGGER.isDebugEnabled()) {
-            for (SchedulerNode node : nodes) {
+            for (SchedulerNode node : schedulerNodes.values()) {
                 LOGGER.debug("Host {} is running {} containers including {} App Masters",
                         node.getNodeID().getHost(), node.getRunningContainers().size(),
                         getNumAMContainers(node.getRunningContainers()));
@@ -58,9 +65,22 @@ public class LeastAMNodesFirstPolicy extends BaseInterceptor implements
NodeScal
         // process HBs from NodeManagers and the state of SchedulerNode objects might change
while we
         // are in the middle of sorting them based on the least number of AM containers.
         synchronized (yarnScheduler) {
-            Collections.sort(nodes, new Comparator<SchedulerNode>() {
+            Collections.sort(taskIDs, new Comparator<Protos.TaskID>() {
                 @Override
-                public int compare(SchedulerNode o1, SchedulerNode o2) {
+                public int compare(Protos.TaskID t1, Protos.TaskID t2) {
+                    SchedulerNode o1 = schedulerNodes.get(schedulerState.getTask(t1).getHostname());
+                    SchedulerNode o2 = schedulerNodes.get(schedulerState.getTask(t2).getHostname());
+
+                    if (o1 == null) { // a NM was launched by Myriad, but it hasn't yet registered
with RM
+                      if (o2 == null) {
+                        return 0;
+                      } else {
+                        return -1;
+                      }
+                    } else if (o2 == null) {
+                      return 1;
+                    } // else, both the NMs have registered with RM
+
                     List<RMContainer> runningContainers1 = o1.getRunningContainers();
                     List<RMContainer> runningContainers2 = o2.getRunningContainers();
 
@@ -78,13 +98,6 @@ public class LeastAMNodesFirstPolicy extends BaseInterceptor implements
NodeScal
                 }
             });
         }
-
-        List<String> hosts = new ArrayList<>(nodes.size());
-        for (SchedulerNode node : nodes) {
-            hosts.add(node.getNodeID().getHost());
-        }
-
-        return hosts;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/34e3958c/myriad-scheduler/src/main/java/com/ebay/myriad/policy/NodeScaleDownPolicy.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/policy/NodeScaleDownPolicy.java
b/myriad-scheduler/src/main/java/com/ebay/myriad/policy/NodeScaleDownPolicy.java
index db80761..f40d360 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/policy/NodeScaleDownPolicy.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/policy/NodeScaleDownPolicy.java
@@ -1,5 +1,7 @@
 package com.ebay.myriad.policy;
 
+import org.apache.mesos.Protos;
+
 import java.util.List;
 
 /**
@@ -8,11 +10,9 @@ import java.util.List;
 public interface NodeScaleDownPolicy {
 
     /**
-     * Get a list of host names of the nodes that needs to be scaled down.
-     * The implementation of the policy should populate this list in a way that
-     * the most preferred nodes to be scaled down should occur first in the list.
-     * @return
+     * Apply a scale down policy to the given list of taskIDs.
+     * @param taskIDs
      */
-    public List<String> getNodesToScaleDown();
+    public void apply(List<Protos.TaskID> taskIDs);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/34e3958c/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
index 540d3e7..27fe406 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
@@ -20,17 +20,16 @@ import com.ebay.myriad.scheduler.constraints.Constraint;
 import com.ebay.myriad.scheduler.constraints.LikeConstraint;
 import com.ebay.myriad.state.NodeTask;
 import com.ebay.myriad.state.SchedulerState;
-import com.google.common.collect.Sets;
+import com.google.common.collect.Lists;
 import com.google.inject.Inject;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
 import org.apache.mesos.Protos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+
 /**
  * Myriad scheduler operations
  */
@@ -57,68 +56,61 @@ public class MyriadOperations {
 
     public void flexDownCluster(NMProfile profile, Constraint constraint, int numInstancesToScaleDown)
{
         // Flex down Pending tasks, if any
-        int numPendingTasksScaledDown = 0;
-          Set<Protos.TaskID> pendingTasks = Sets.newHashSet(this.schedulerState.getPendingTaskIds());
-
-          for (Protos.TaskID taskId : pendingTasks) {
-            NodeTask nodeTask = schedulerState.getTask(taskId);
-            if (nodeTask != null && nodeTask.getProfile().getName().equals(profile.getName())
&&
-                meetsConstraint(nodeTask, constraint)) {
-              this.schedulerState.makeTaskKillable(taskId);
-              numPendingTasksScaledDown++;
-              if (numPendingTasksScaledDown == numInstancesToScaleDown) {
-                break;
-              }
-            }
-          }
+        int numPendingTasksScaledDown = flexDownPendingTasks(
+            profile, constraint, numInstancesToScaleDown);
 
         // Flex down Staging tasks, if any
-        int numStagingTasksScaledDown = 0;
-        if (numPendingTasksScaledDown < numInstancesToScaleDown) {
-          Set<Protos.TaskID> stagingTasks = Sets.newHashSet(this.schedulerState.getStagingTaskIds());
-
-          for (Protos.TaskID taskId : stagingTasks) {
-            NodeTask nodeTask = schedulerState.getTask(taskId);
-            if (nodeTask != null && nodeTask.getProfile().getName().equals(profile.getName())
&&
-                meetsConstraint(nodeTask, constraint)) {
-              this.schedulerState.makeTaskKillable(taskId);
-              numStagingTasksScaledDown++;
-              if (numStagingTasksScaledDown + numPendingTasksScaledDown == numInstancesToScaleDown)
{
-                break;
-              }
-            }
-          }
-        }
+        int numStagingTasksScaledDown = flexDownStagingTasks(
+            profile, constraint, numInstancesToScaleDown - numPendingTasksScaledDown);
 
-        int numActiveTasksScaledDown = 0;
-        if (numPendingTasksScaledDown + numStagingTasksScaledDown < numInstancesToScaleDown)
{
-          Set<NodeTask> activeTasksForProfile = Sets.newHashSet(this.schedulerState.getActiveTasksForProfile(profile));
-          List<String> nodesToScaleDown = nodeScaleDownPolicy.getNodesToScaleDown();
-          filterUnregisteredNMs(activeTasksForProfile, nodesToScaleDown);
-
-          for (int i = 0; i < numInstancesToScaleDown - (numPendingTasksScaledDown + numStagingTasksScaledDown);
i++) {
-            for (NodeTask nodeTask : activeTasksForProfile) {
-              if (nodesToScaleDown.size() > i &&
-                  nodesToScaleDown.get(i).equals(nodeTask.getHostname()) &&
-                  meetsConstraint(nodeTask, constraint)) {
-                this.schedulerState.makeTaskKillable(nodeTask.getTaskStatus().getTaskId());
-                numActiveTasksScaledDown++;
-                if (LOGGER.isDebugEnabled()) {
-                  LOGGER.debug("Marked NodeTask {} on host {} for kill.",
-                      nodeTask.getTaskStatus().getTaskId(), nodeTask.getHostname());
-                }
-              }
-            }
-          }
-        }
+        // Flex down Active tasks, if any
+        int numActiveTasksScaledDown = flexDownActiveTasks(
+            profile, constraint, numInstancesToScaleDown - numPendingTasksScaledDown - numStagingTasksScaledDown);
 
         if (numActiveTasksScaledDown + numStagingTasksScaledDown + numPendingTasksScaledDown
== 0) {
-          LOGGER.info("No Node Managers with profile '{}' and constraint {} found for scaling
down.",
-              profile.getName(), constraint.toString());
+          LOGGER.info("No Node Managers with profile '{}' and constraint '{}' found for scaling
down.",
+              profile.getName(), constraint == null ? "null" : constraint.toString());
         } else {
-          LOGGER.info("Flexed down {} active, {} staging  and {} pending Node Managers with
'{}' profile.",
-              numActiveTasksScaledDown, numStagingTasksScaledDown, numPendingTasksScaledDown,
profile.getName());
+          LOGGER.info("Flexed down {} active, {} staging  and {} pending Node Managers with
" +
+              "'{}' profile and constraint '{}'.", numActiveTasksScaledDown, numStagingTasksScaledDown,
+              numPendingTasksScaledDown, profile.getName(), constraint == null ? "null" :
constraint.toString());
+        }
+    }
+
+    private int flexDownPendingTasks(NMProfile profile, Constraint constraint, int numInstancesToScaleDown)
{
+      return numInstancesToScaleDown > 0 ? flexDownTasks(schedulerState.getPendingTaskIDsForProfile(profile),
+          profile, constraint, numInstancesToScaleDown) : 0;
+    }
+
+  private int flexDownStagingTasks(NMProfile profile, Constraint constraint, int numInstancesToScaleDown)
{
+      return numInstancesToScaleDown > 0 ? flexDownTasks(schedulerState.getStagingTaskIDsForProfile(profile),
+          profile, constraint, numInstancesToScaleDown) : 0;
+    }
+
+    private int flexDownActiveTasks(NMProfile profile, Constraint constraint, int numInstancesToScaleDown)
{
+      if (numInstancesToScaleDown > 0) {
+        List<Protos.TaskID> activeTasksForProfile = Lists.newArrayList(schedulerState.getActiveTaskIDsForProfile(profile));
+        nodeScaleDownPolicy.apply(activeTasksForProfile);
+        return flexDownTasks(activeTasksForProfile, profile, constraint, numInstancesToScaleDown);
+      }
+      return 0;
+    }
+
+  private int flexDownTasks(Collection<Protos.TaskID> taskIDs, NMProfile profile,
+                              Constraint constraint, int numInstancesToScaleDown) {
+      int numInstancesScaledDown = 0;
+      for (Protos.TaskID taskID : taskIDs) {
+        NodeTask nodeTask = schedulerState.getTask(taskID);
+        if (nodeTask.getProfile().getName().equals(profile.getName()) &&
+            meetsConstraint(nodeTask, constraint)) {
+          this.schedulerState.makeTaskKillable(taskID);
+          numInstancesScaledDown++;
+          if (numInstancesScaledDown == numInstancesToScaleDown) {
+            break;
+          }
         }
+      }
+      return numInstancesScaledDown;
     }
 
   private boolean meetsConstraint(NodeTask nodeTask, Constraint constraint) {
@@ -142,22 +134,4 @@ public class MyriadOperations {
     return true;
   }
 
-  private void filterUnregisteredNMs(Set<NodeTask> activeTasksForProfile, List<String>
registeredNMHosts) {
-    // If a NM is flexed down it takes time for the RM to realize the NM is no longer up
-    // We need to make sure we filter out nodes that have already been flexed down
-    // but have not disappeared from the RM's view of the cluster
-    for (Iterator<String> iterator = registeredNMHosts.iterator(); iterator.hasNext();)
{
-        String nodeToScaleDown = iterator.next();
-        boolean nodePresentInMyriad = false;
-        for (NodeTask nodeTask : activeTasksForProfile) {
-            if (nodeTask.getHostname().equals(nodeToScaleDown)) {
-                nodePresentInMyriad = true;
-                break;
-            }
-        }
-        if (!nodePresentInMyriad) {
-            iterator.remove();
-        }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/34e3958c/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/SchedulerUtils.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/SchedulerUtils.java
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/SchedulerUtils.java
index 36da5b1..46a3d89 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/SchedulerUtils.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/SchedulerUtils.java
@@ -19,16 +19,11 @@ import com.ebay.myriad.state.NodeTask;
 import com.ebay.myriad.state.SchedulerState;
 import com.google.common.base.Preconditions;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
 import org.apache.mesos.Protos;
-import org.apache.mesos.Protos.Attribute;
-import org.apache.mesos.Protos.Offer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
 
 /**
  * Provides utilities for scheduling with the mesos offers
@@ -36,25 +31,6 @@ import java.util.Map;
 public class SchedulerUtils {
     private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerUtils.class);
 
-    public static boolean isMatchSlaveAttributes(Offer offer, Map<String, String> requestAttributes)
{
-        boolean match = true;
-
-        Map<String, String> offerAttributes = new HashMap<>();
-        for (Attribute attribute : offer.getAttributesList()) {
-            offerAttributes.put(attribute.getName(), attribute.getText().getValue());
-        }
-
-        // Match with offer attributes only if request has attributes.
-        if (!MapUtils.isEmpty(requestAttributes)) {
-            match = offerAttributes.equals(requestAttributes);
-        }
-
-        LOGGER.debug("Match status: {} for offer: {} and requestAttributes: {}",
-                match, offer, requestAttributes);
-
-        return match;
-    }
-
     public static boolean isUniqueHostname(Protos.OfferOrBuilder offer,
                                            Collection<NodeTask> tasks) {
         Preconditions.checkArgument(offer != null);

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/34e3958c/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/LikeConstraint.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/LikeConstraint.java
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/LikeConstraint.java
index 5092783..727a19c 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/LikeConstraint.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/LikeConstraint.java
@@ -80,7 +80,7 @@ public class LikeConstraint implements Constraint {
     if (lhs != null ? !lhs.equals(that.lhs) : that.lhs != null) {
       return false;
     }
-    if (pattern != null ? !pattern.equals(that.pattern) : that.pattern != null) {
+    if (pattern != null ? !pattern.pattern().equals(that.pattern.pattern()) : that.pattern
!= null) {
       return false;
     }
 
@@ -90,7 +90,7 @@ public class LikeConstraint implements Constraint {
   @Override
   public int hashCode() {
     int result = lhs != null ? lhs.hashCode() : 0;
-    result = 31 * result + (pattern != null ? pattern.hashCode() : 0);
+    result = 31 * result + (pattern != null ? pattern.pattern().hashCode() : 0);
     return result;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/34e3958c/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
index 1ce647f..cd90f56 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
@@ -177,7 +177,7 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv
     checkResource(mem < 0, "mem");
     checkResource(ports < 0, "port");
 
-    return checkAggregates(offer, profile, ports, cpus, mem);
+    return checkAggregates(profile, ports, cpus, mem);
   }
 
   private boolean meetsConstraint(Offer offer, Constraint constraint) {
@@ -203,17 +203,16 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv
     }
   }
 
-  private boolean checkAggregates(Offer offer, NMProfile profile, int ports, double cpus,
double mem) {
-    Map<String, String> requestAttributes = new HashMap<>();
+  private boolean checkAggregates(NMProfile profile, int ports, double cpus, double mem)
{
 
     if (taskUtils.getAggregateCpus(profile) <= cpus
         && taskUtils.getAggregateMemory(profile) <= mem
-        && SchedulerUtils.isMatchSlaveAttributes(offer, requestAttributes)
         && NMPorts.expectedNumPorts() <= ports) {
       return true;
     } else {
-      LOGGER.info("Offer not sufficient for task with, cpu: {}, memory: {}, ports: {}",
-          taskUtils.getAggregateCpus(profile), taskUtils.getAggregateMemory(profile), ports);
+      LOGGER.info("Offer not sufficient for launching task. Task requires cpu: {}, memory:
{}, # of ports: {}. " +
+          "Offer has cpu: {}, memory: {}, # of ports: {}", taskUtils.getAggregateCpus(profile),
+          taskUtils.getAggregateMemory(profile), NMPorts.expectedNumPorts(), cpus, mem, ports);
       return false;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/34e3958c/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
index e428a1d..28aa17d 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
@@ -16,26 +16,15 @@
 package com.ebay.myriad.state;
 
 import com.ebay.myriad.scheduler.NMProfile;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
+import com.ebay.myriad.state.utils.StoreContext;
 import org.apache.commons.collections.CollectionUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.SlaveID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import com.ebay.myriad.state.utils.StoreContext;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Represents the state of the Myriad scheduler
@@ -175,6 +164,17 @@ public class SchedulerState {
         return Collections.unmodifiableSet(this.pendingTasks);
     }
 
+    public synchronized Collection<Protos.TaskID> getPendingTaskIDsForProfile(NMProfile
profile) {
+      List<Protos.TaskID> pendingTaskIds = new ArrayList<>();
+      for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
+        NodeTask nodeTask = entry.getValue();
+        if (pendingTasks.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(profile.getName()))
{
+          pendingTaskIds.add(entry.getKey());
+        }
+      }
+      return Collections.unmodifiableCollection(pendingTaskIds);
+    }
+
     public synchronized Set<Protos.TaskID> getActiveTaskIds() {
         return Collections.unmodifiableSet(this.activeTasks);
     }
@@ -192,18 +192,18 @@ public class SchedulerState {
         return Collections.unmodifiableCollection(activeNodeTasks);
     }
 
-    public synchronized Collection<NodeTask> getActiveTasksForProfile(NMProfile profile)
{
-      List<NodeTask> activeNodeTasks = new ArrayList<>();
+    public synchronized Collection<Protos.TaskID> getActiveTaskIDsForProfile(NMProfile
profile) {
+      List<Protos.TaskID> activeTaskIDs = new ArrayList<>();
       if (CollectionUtils.isNotEmpty(activeTasks)
           && CollectionUtils.isNotEmpty(tasks.values())) {
         for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
           NodeTask nodeTask = entry.getValue();
           if (activeTasks.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(profile.getName()))
{
-            activeNodeTasks.add(nodeTask);
+            activeTaskIDs.add(entry.getKey());
           }
         }
       }
-      return Collections.unmodifiableCollection(activeNodeTasks);
+      return Collections.unmodifiableCollection(activeTaskIDs);
     }
 
   // TODO (sdaingade) Clone NodeTask
@@ -221,7 +221,18 @@ public class SchedulerState {
         return Collections.unmodifiableSet(this.stagingTasks);
     }
 
-    public synchronized Set<Protos.TaskID> getLostTaskIds() {
+    public synchronized Collection<Protos.TaskID> getStagingTaskIDsForProfile(NMProfile
profile) {
+      List<Protos.TaskID> stagingTaskIDs = new ArrayList<>();
+      for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
+        NodeTask nodeTask = entry.getValue();
+        if (stagingTasks.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(profile.getName()))
{
+          stagingTaskIDs.add(entry.getKey());
+        }
+      }
+      return Collections.unmodifiableCollection(stagingTaskIDs);
+    }
+
+  public synchronized Set<Protos.TaskID> getLostTaskIds() {
         return Collections.unmodifiableSet(this.lostTasks);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/34e3958c/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/constraints/LikeConstraintSpec.groovy
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/constraints/LikeConstraintSpec.groovy
b/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/constraints/LikeConstraintSpec.groovy
index f2972a7..5504f33 100644
--- a/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/constraints/LikeConstraintSpec.groovy
+++ b/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/constraints/LikeConstraintSpec.groovy
@@ -50,6 +50,18 @@ class LikeConstraintSpec extends Specification {
         getTextAttribute("random", "random value"))                | true
   }
 
+  def "equals"() {
+    given:
+    def constraint1 = new LikeConstraint("hostname", "perfnode13[3-4].perf.lab")
+    def constraint2 = new LikeConstraint("hostname", "perfnode13[3-4].perf.lab")
+    def constraint3 = new LikeConstraint("hostname", "perfnode133.perf.lab")
+
+    expect:
+    constraint1.equals(constraint2)
+    !constraint1.equals(constraint3)
+    !constraint2.equals(constraint3)
+  }
+
   private static Protos.Attribute getTextAttribute(String name, String value) {
     Protos.Attribute.newBuilder()
         .setName(name)


Mime
View raw message