aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject incubator-aurora git commit: Adding instrumentation into the scheduling pipeline.
Date Thu, 20 Nov 2014 21:23:36 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 6d06d8639 -> ada97bdbb


Adding instrumentation into the scheduling pipeline.

Bugs closed: AURORA-914

Reviewed at https://reviews.apache.org/r/27705/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/ada97bdb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/ada97bdb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/ada97bdb

Branch: refs/heads/master
Commit: ada97bdbb701bb6131152cc0439eba9d334643ba
Parents: 6d06d86
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Thu Nov 20 13:20:31 2014 -0800
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Thu Nov 20 13:20:31 2014 -0800

----------------------------------------------------------------------
 .../org/apache/aurora/scheduler/TaskVars.java   |  55 ++++++++++
 .../aurora/scheduler/async/OfferQueue.java      |  56 ++++++----
 .../aurora/scheduler/async/TaskScheduler.java   |  79 ++++++-------
 .../aurora/scheduler/events/PubsubEvent.java    |   8 ++
 .../scheduler/filter/ConstraintMatcher.java     |  22 +---
 .../scheduler/filter/SchedulingFilter.java      | 110 +++++++++++++++----
 .../scheduler/filter/SchedulingFilterImpl.java  |   4 +-
 .../aurora/scheduler/metadata/NearestFit.java   |   7 +-
 .../aurora/scheduler/stats/CachedCounters.java  |   8 +-
 .../apache/aurora/scheduler/TaskVarsTest.java   |  54 +++++++++
 .../events/NotifyingSchedulingFilterTest.java   |   4 +-
 .../filter/SchedulingFilterImplTest.java        |  27 +++--
 .../scheduler/metadata/NearestFitTest.java      |   6 +-
 .../thrift/SchedulerThriftInterfaceTest.java    |   2 +-
 14 files changed, 314 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ada97bdb/src/main/java/org/apache/aurora/scheduler/TaskVars.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TaskVars.java b/src/main/java/org/apache/aurora/scheduler/TaskVars.java
index cf8f758..2f93054 100644
--- a/src/main/java/org/apache/aurora/scheduler/TaskVars.java
+++ b/src/main/java/org/apache/aurora/scheduler/TaskVars.java
@@ -29,6 +29,7 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.eventbus.Subscribe;
@@ -40,6 +41,9 @@ import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
+import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
@@ -57,6 +61,35 @@ class TaskVars extends AbstractIdleService implements EventSubscriber {
   private static final ImmutableSet<ScheduleStatus> TRACKED_JOB_STATES =
       ImmutableSet.of(ScheduleStatus.LOST, ScheduleStatus.FAILED);
 
+  /**
+   * Tracks {@link Vetoed} events that have strictly static vetoes to determine task/offer
+   * satisfiability. Vetoes are considered static if the cached offer will never be able
to satisfy
+   * a given task requirements.
+   */
+  @VisibleForTesting
+  static final String VETO_STATIC_NAME = "scheduling_veto_static";
+
+  /**
+   * Tracks {@link Vetoed} events that have strictly dynamic vetoes to determine task/offer
+   * satisfiability. Vetoes are considered dynamic if the cached offer may still satisfy
a given
+   * task requirements if cluster conditions change (e.g. other tasks are killed or rescheduled).
+   */
+  @VisibleForTesting
+  static final String VETO_DYNAMIC_NAME = "scheduling_veto_dynamic";
+
+  /**
+   * Tracks {@link Vetoed} events that have both static and dynamic vetoes.
+   */
+  @VisibleForTesting
+  static final String VETO_MIXED_NAME = "scheduling_veto_mixed";
+
+  private static final Map<VetoType, String> VETO_TYPES_TO_COUNTERS = ImmutableMap.of(
+      VetoType.INSUFFICIENT_RESOURCES, VETO_STATIC_NAME,
+      VetoType.CONSTRAINT_MISMATCH, VETO_STATIC_NAME,
+      VetoType.LIMIT_NOT_SATISFIED, VETO_DYNAMIC_NAME,
+      VetoType.MAINTENANCE, VETO_STATIC_NAME
+  );
+
   private final LoadingCache<String, Counter> counters;
   private final LoadingCache<String, Counter> untrackedCounters;
   private final Storage storage;
@@ -214,6 +247,28 @@ class TaskVars extends AbstractIdleService implements EventSubscriber
{
     }
   }
 
+  @Subscribe
+  public void taskVetoed(Vetoed event) {
+    // This handler has high call frequency and as such needs to be optimized for reduced
+    // heap churn. We are going to use loop flags to avoid additional object creation.
+    String metricName = null;
+    for (Veto veto : event.getVetoes()) {
+      String currentName = VETO_TYPES_TO_COUNTERS.get(veto.getVetoType());
+      if (currentName == null) {
+        throw new IllegalStateException("Unknown veto type in " + event);
+      }
+
+      if (metricName == null) {
+        metricName = currentName;
+      } else if (!currentName.equals(metricName)) {
+        counters.getUnchecked(VETO_MIXED_NAME).increment();
+        return;
+      }
+    }
+
+    counters.getUnchecked(metricName).increment();
+  }
+
   private static class Counter implements Supplier<Long> {
     private final AtomicLong value = new AtomicLong();
     private boolean exported = false;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ada97bdb/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java b/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
index d2682cd..f663838 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
@@ -31,6 +31,7 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Ordering;
 import com.google.common.eventbus.Subscribe;
+import com.twitter.common.inject.TimedInterceptor.Timed;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.stats.Stats;
@@ -287,6 +288,7 @@ public interface OfferQueue extends EventSubscriber {
       }
     }
 
+    @Timed("offer_queue_launch_first")
     @Override
     public boolean launchFirst(Function<HostOffer, Optional<TaskInfo>> acceptor)
         throws LaunchException {
@@ -295,28 +297,40 @@ public interface OfferQueue extends EventSubscriber {
       // possibility of a race between the same offers being accepted by different threads.
 
       for (HostOffer offer : hostOffers.getWeaklyConsistentOffers()) {
-        Optional<TaskInfo> assignment = acceptor.apply(offer);
-        if (assignment.isPresent()) {
-          // Guard against an offer being removed after we grabbed it from the iterator.
-          // If that happens, the offer will not exist in hostOffers, and we can immediately
-          // send it back to LOST for quick reschedule.
-          // Removing while iterating counts on the use of a weakly-consistent iterator being
used,
-          // which is a feature of ConcurrentSkipListSet.
-          if (hostOffers.remove(offer.getOffer().getId())) {
-            try {
-              driver.launchTask(offer.getOffer().getId(), assignment.get());
-              return true;
-            } catch (IllegalStateException e) {
-              // TODO(William Farner): Catch only the checked exception produced by Driver
-              // once it changes from throwing IllegalStateException when the driver is not
yet
-              // registered.
-              throw new LaunchException("Failed to launch task.", e);
-            }
-          } else {
-            offerRaces.incrementAndGet();
-            throw new LaunchException(
-                "Accepted offer no longer exists in offer queue, likely data race.");
+        if (acceptOffer(offer, acceptor)) {
+          return true;
+        }
+      }
+
+      return false;
+    }
+
+    @Timed("offer_queue_accept_offer")
+    protected boolean acceptOffer(
+        HostOffer offer,
+        Function<HostOffer, Optional<TaskInfo>> acceptor) throws LaunchException
{
+
+      Optional<TaskInfo> assignment = acceptor.apply(offer);
+      if (assignment.isPresent()) {
+        // Guard against an offer being removed after we grabbed it from the iterator.
+        // If that happens, the offer will not exist in hostOffers, and we can immediately
+        // send it back to LOST for quick reschedule.
+        // Removing while iterating counts on the use of a weakly-consistent iterator being
used,
+        // which is a feature of ConcurrentSkipListSet.
+        if (hostOffers.remove(offer.getOffer().getId())) {
+          try {
+            driver.launchTask(offer.getOffer().getId(), assignment.get());
+            return true;
+          } catch (IllegalStateException e) {
+            // TODO(William Farner): Catch only the checked exception produced by Driver
+            // once it changes from throwing IllegalStateException when the driver is not
yet
+            // registered.
+            throw new LaunchException("Failed to launch task.", e);
           }
+        } else {
+          offerRaces.incrementAndGet();
+          throw new LaunchException(
+              "Accepted offer no longer exists in offer queue, likely data race.");
         }
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ada97bdb/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index f247ccf..626545a 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -188,43 +188,7 @@ public interface TaskScheduler extends EventSubscriber {
         return storage.write(new MutateWork.Quiet<Boolean>() {
           @Override
           public Boolean apply(MutableStoreProvider store) {
-            LOG.fine("Attempting to schedule task " + taskId);
-            final ITaskConfig task = Iterables.getOnlyElement(
-                Iterables.transform(
-                    store.getTaskStore().fetchTasks(Query.taskScoped(taskId).byStatus(PENDING)),
-                    Tasks.SCHEDULED_TO_INFO),
-                null);
-            if (task == null) {
-              LOG.warning("Failed to look up task " + taskId + ", it may have been deleted.");
-            } else {
-              AttributeAggregate aggregate = getJobState(store, task.getJob());
-              try {
-                ResourceRequest resourceRequest = new ResourceRequest(task, taskId, aggregate);
-                if (!offerQueue.launchFirst(getAssignerFunction(store, resourceRequest)))
{
-                  // Task could not be scheduled.
-                  maybePreemptFor(taskId, aggregate);
-                  attemptsNoMatch.incrementAndGet();
-                  return false;
-                }
-              } catch (OfferQueue.LaunchException e) {
-                LOG.log(Level.WARNING, "Failed to launch task.", e);
-                attemptsFailed.incrementAndGet();
-
-                // The attempt to schedule the task failed, so we need to backpedal on the
-                // assignment.
-                // It is in the LOST state and a new task will move to PENDING to replace
it.
-                // Should the state change fail due to storage issues, that's okay.  The
task will
-                // time out in the ASSIGNED state and be moved to LOST.
-                stateManager.changeState(
-                    store,
-                    taskId,
-                    Optional.of(PENDING),
-                    LOST,
-                    LAUNCH_FAILED_MSG);
-              }
-            }
-
-            return true;
+            return scheduleTask(store, taskId);
           }
         });
       } catch (RuntimeException e) {
@@ -236,6 +200,47 @@ public interface TaskScheduler extends EventSubscriber {
       }
     }
 
+    @Timed("task_schedule_attempt_locked")
+    private boolean scheduleTask(MutableStoreProvider store, String taskId) {
+      LOG.fine("Attempting to schedule task " + taskId);
+      final ITaskConfig task = Iterables.getOnlyElement(
+          Iterables.transform(
+              store.getTaskStore().fetchTasks(Query.taskScoped(taskId).byStatus(PENDING)),
+              Tasks.SCHEDULED_TO_INFO),
+          null);
+      if (task == null) {
+        LOG.warning("Failed to look up task " + taskId + ", it may have been deleted.");
+      } else {
+        AttributeAggregate aggregate = getJobState(store, task.getJob());
+        try {
+          ResourceRequest resourceRequest = new ResourceRequest(task, taskId, aggregate);
+          if (!offerQueue.launchFirst(getAssignerFunction(store, resourceRequest))) {
+            // Task could not be scheduled.
+            maybePreemptFor(taskId, aggregate);
+            attemptsNoMatch.incrementAndGet();
+            return false;
+          }
+        } catch (OfferQueue.LaunchException e) {
+          LOG.log(Level.WARNING, "Failed to launch task.", e);
+          attemptsFailed.incrementAndGet();
+
+          // The attempt to schedule the task failed, so we need to backpedal on the
+          // assignment.
+          // It is in the LOST state and a new task will move to PENDING to replace it.
+          // Should the state change fail due to storage issues, that's okay.  The task will
+          // time out in the ASSIGNED state and be moved to LOST.
+          stateManager.changeState(
+              store,
+              taskId,
+              Optional.of(PENDING),
+              LOST,
+              LAUNCH_FAILED_MSG);
+        }
+      }
+
+      return true;
+    }
+
     private void maybePreemptFor(String taskId, AttributeAggregate attributeAggregate) {
       if (reservations.hasReservationForTask(taskId)) {
         return;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ada97bdb/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java b/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
index 4821a78..1d8f012 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
@@ -227,6 +227,14 @@ public interface PubsubEvent {
     public int hashCode() {
       return Objects.hash(taskId, vetoes);
     }
+
+    @Override
+    public String toString() {
+      return com.google.common.base.Objects.toStringHelper(this)
+          .add("taskId", taskId)
+          .add("vetoes", vetoes)
+          .toString();
+    }
   }
 
   class DriverRegistered implements PubsubEvent {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ada97bdb/src/main/java/org/apache/aurora/scheduler/filter/ConstraintMatcher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/ConstraintMatcher.java b/src/main/java/org/apache/aurora/scheduler/filter/ConstraintMatcher.java
index cc8c5b8..ecba276 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/ConstraintMatcher.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/ConstraintMatcher.java
@@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.filter;
 
 import java.util.Set;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
@@ -37,21 +36,6 @@ final class ConstraintMatcher {
     // Utility class.
   }
 
-  @VisibleForTesting
-  static Veto limitVeto(String limit) {
-    return new Veto("Limit not satisfied: " + limit, Veto.MAX_SCORE);
-  }
-
-  @VisibleForTesting
-  static Veto mismatchVeto(String constraint) {
-    return Veto.constraintMismatch("Constraint not satisfied: " + constraint);
-  }
-
-  @VisibleForTesting
-  static Veto maintenanceVeto(String reason) {
-    return new Veto("Host " + reason + " for maintenance", Veto.MAX_SCORE);
-  }
-
   private static final Function<IAttribute, Set<String>> GET_VALUES =
       new Function<IAttribute, Set<String>>() {
         @Override
@@ -92,11 +76,11 @@ final class ConstraintMatcher {
             taskConstraint.getValue());
         return matches
             ? Optional.<Veto>absent()
-            : Optional.of(mismatchVeto(constraint.getName()));
+            : Optional.of(Veto.constraintMismatch(constraint.getName()));
 
       case LIMIT:
         if (!attribute.isPresent()) {
-          return Optional.of(mismatchVeto(constraint.getName()));
+          return Optional.of(Veto.constraintMismatch(constraint.getName()));
         }
 
         boolean satisfied = AttributeFilter.matches(
@@ -105,7 +89,7 @@ final class ConstraintMatcher {
             cachedjobState);
         return satisfied
             ? Optional.<Veto>absent()
-            : Optional.of(limitVeto(constraint.getName()));
+            : Optional.of(Veto.unsatisfiedLimit(constraint.getName()));
 
       default:
         throw new SchedulerException("Failed to recognize the constraint type: "

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ada97bdb/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
index 723e7ab..c2a342c 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
@@ -16,51 +16,115 @@ package org.apache.aurora.scheduler.filter;
 import java.util.Objects;
 import java.util.Set;
 
-import com.google.common.annotations.VisibleForTesting;
-
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.storage.entities.IConstraint;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
+import static org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType.CONSTRAINT_MISMATCH;
+import static org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType.INSUFFICIENT_RESOURCES;
+import static org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType.LIMIT_NOT_SATISFIED;
+import static org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType.MAINTENANCE;
+
 /**
  * Determines whether a proposed scheduling assignment should be allowed.
  */
 public interface SchedulingFilter {
 
+  enum VetoType {
+    /**
+     * Not enough resources to satisfy a proposed scheduling assignment.
+     */
+    INSUFFICIENT_RESOURCES("Insufficient: %s"),
+
+    /**
+     * Unable to satisfy proposed scheduler assignment constraints.
+     */
+    CONSTRAINT_MISMATCH("Constraint not satisfied: %s"),
+
+    /**
+     * Constraint limit is not satisfied for a proposed scheduling assignment.
+     */
+    LIMIT_NOT_SATISFIED("Limit not satisfied: %s"),
+
+    /**
+     * Unable to satisfy a proposed scheduler assignment due to cluster maintenance.
+     */
+    MAINTENANCE("Host %s for maintenance");
+
+    private final String reasonFormat;
+
+    VetoType(String reasonFormat) {
+      this.reasonFormat = reasonFormat;
+    }
+
+    String formatReason(String reason) {
+      return String.format(reasonFormat, reason);
+    }
+  }
+
   /**
    * Reason for a proposed scheduling assignment to be filtered out.
    * A veto also contains a score, which is an opaque indicator as to how strong a veto is.
 This
    * is only intended to be used for relative ranking of vetoes for determining which veto
against
    * a scheduling assignment is 'weakest'.
    */
-  class Veto {
+  final class Veto {
     public static final int MAX_SCORE = 1000;
 
+    private final VetoType vetoType;
     private final String reason;
     private final int score;
-    private final boolean valueMismatch;
 
-    private Veto(String reason, int score, boolean valueMismatch) {
-      this.reason = reason;
+    private Veto(VetoType vetoType, String reasonParameter, int score) {
+      this.vetoType = vetoType;
+      this.reason = vetoType.formatReason(reasonParameter);
       this.score = Math.min(MAX_SCORE, score);
-      this.valueMismatch = valueMismatch;
-    }
-
-    @VisibleForTesting
-    public Veto(String reason, int score) {
-      this(reason, score, false);
     }
 
     /**
-     * Creates a special veto that represents a mismatch between the server and task's configuration
+     * Creates a veto that represents a mismatch between the server and task's configuration
      * for an attribute.
      *
-     * @param reason Information about the value mismatch.
+     * @param constraint A constraint name.
      * @return A constraint mismatch veto.
      */
-    public static Veto constraintMismatch(String reason) {
-      return new Veto(reason, MAX_SCORE, true);
+    public static Veto constraintMismatch(String constraint) {
+      return new Veto(CONSTRAINT_MISMATCH, constraint, MAX_SCORE);
+    }
+
+    /**
+     * Creates a veto the represents an unsatisfied constraint limit between the server and
task's
+     * configuration for an attribute.
+     *
+     * @param limit A constraint name.
+     * @return A unsatisfied limit veto.
+     */
+    public static Veto unsatisfiedLimit(String limit) {
+      return new Veto(LIMIT_NOT_SATISFIED, limit, MAX_SCORE);
+    }
+
+    /**
+     * Creates a veto that represents an inability of the server to satisfy task's configuration
+     * resource requirements.
+     *
+     * @param resource A resource name.
+     * @param score A veto score.
+     * @return An insufficient resources veto.
+     */
+    public static Veto insufficientResources(String resource, int score) {
+      return new Veto(INSUFFICIENT_RESOURCES, resource, score);
+    }
+
+    /**
+     * Creates a veto that represents a lack of suitable for assignment hosts due to cluster
+     * maintenance.
+     *
+     * @param maintenanceMode A maintenance mode.
+     * @return A maintenance veto.
+     */
+    public static Veto maintenance(String maintenanceMode) {
+      return new Veto(MAINTENANCE, maintenanceMode, MAX_SCORE);
     }
 
     public String getReason() {
@@ -71,8 +135,8 @@ public interface SchedulingFilter {
       return score;
     }
 
-    public boolean isConstraintMismatch() {
-      return valueMismatch;
+    public VetoType getVetoType() {
+      return vetoType;
     }
 
     @Override
@@ -82,22 +146,22 @@ public interface SchedulingFilter {
       }
 
       Veto other = (Veto) o;
-      return Objects.equals(reason, other.reason)
-          && Objects.equals(score, other.score)
-          && Objects.equals(valueMismatch, other.valueMismatch);
+      return Objects.equals(vetoType, other.vetoType)
+          && Objects.equals(reason, other.reason)
+          && Objects.equals(score, other.score);
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(reason, score, valueMismatch);
+      return Objects.hash(vetoType, reason, score);
     }
 
     @Override
     public String toString() {
       return com.google.common.base.Objects.toStringHelper(this)
+          .add("vetoType", vetoType)
           .add("reason", reason)
           .add("score", score)
-          .add("valueMismatch", valueMismatch)
           .toString();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ada97bdb/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
index cf3bb64..1cb56f1 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
@@ -92,7 +92,7 @@ public class SchedulingFilterImpl implements SchedulingFilter {
 
     @VisibleForTesting
     Veto veto(double excess) {
-      return new Veto("Insufficient " + name, scale(excess, range));
+      return Veto.insufficientResources(name, scale(excess, range));
     }
   }
 
@@ -155,7 +155,7 @@ public class SchedulingFilterImpl implements SchedulingFilter {
 
   private Optional<Veto> getMaintenanceVeto(MaintenanceMode mode) {
     return VETO_MODES.contains(mode)
-        ? Optional.of(ConstraintMatcher.maintenanceVeto(mode.toString().toLowerCase()))
+        ? Optional.of(Veto.maintenance(mode.toString().toLowerCase()))
         : NO_VETO;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ada97bdb/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java b/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java
index 8720369..c3097e4 100644
--- a/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java
+++ b/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java
@@ -36,6 +36,7 @@ import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
 import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 
 /**
@@ -107,14 +108,14 @@ public class NearestFit implements EventSubscriber {
   private static final Predicate<Veto> IS_CONSTRAINT_MISMATCH = new Predicate<Veto>()
{
     @Override
     public boolean apply(Veto veto) {
-      return veto.isConstraintMismatch();
+      return veto.getVetoType() == SchedulingFilter.VetoType.CONSTRAINT_MISMATCH;
     }
   };
 
   /**
    * Records a task veto event.
-   * This will ignore any veto events where any veto returns {@code true} from
-   * {@link Veto#isConstraintMismatch()}.
+   * This will ignore any veto events with a type of
+   * {@link SchedulingFilter.VetoType#CONSTRAINT_MISMATCH}
    *
    * @param vetoEvent Veto event.
    */

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ada97bdb/src/main/java/org/apache/aurora/scheduler/stats/CachedCounters.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/CachedCounters.java b/src/main/java/org/apache/aurora/scheduler/stats/CachedCounters.java
index aaedb3b..4a3bed3 100644
--- a/src/main/java/org/apache/aurora/scheduler/stats/CachedCounters.java
+++ b/src/main/java/org/apache/aurora/scheduler/stats/CachedCounters.java
@@ -17,6 +17,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import javax.inject.Inject;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
@@ -25,11 +26,12 @@ import com.twitter.common.stats.StatsProvider;
 /**
  * A cache of stats, allowing counters to be fetched and reused based on their names.
  */
-class CachedCounters {
+public class CachedCounters {
   private final LoadingCache<String, AtomicLong> cache;
 
+  @VisibleForTesting
   @Inject
-  CachedCounters(final StatsProvider stats) {
+  public CachedCounters(final StatsProvider stats) {
     cache = CacheBuilder.newBuilder().build(
         new CacheLoader<String, AtomicLong>() {
           @Override
@@ -40,7 +42,7 @@ class CachedCounters {
     );
   }
 
-  AtomicLong get(String name) {
+  public AtomicLong get(String name) {
     return cache.getUnchecked(name);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ada97bdb/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java b/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
index 12ea4c6..4e7efb3 100644
--- a/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
@@ -32,8 +32,10 @@ import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
@@ -50,6 +52,9 @@ import static org.apache.aurora.gen.ScheduleStatus.INIT;
 import static org.apache.aurora.gen.ScheduleStatus.LOST;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.scheduler.TaskVars.VETO_DYNAMIC_NAME;
+import static org.apache.aurora.scheduler.TaskVars.VETO_MIXED_NAME;
+import static org.apache.aurora.scheduler.TaskVars.VETO_STATIC_NAME;
 import static org.apache.aurora.scheduler.TaskVars.jobStatName;
 import static org.apache.aurora.scheduler.TaskVars.rackStatName;
 import static org.easymock.EasyMock.expect;
@@ -114,6 +119,12 @@ public class TaskVarsTest extends EasyMockTest {
         task.getStatus()));
   }
 
+  private void applyVeto(IScheduledTask task, Veto... vetoes) {
+    vars.taskVetoed(new PubsubEvent.Vetoed(
+        task.getAssignedTask().getTaskId(),
+        ImmutableSet.copyOf(vetoes)));
+  }
+
   private void schedulerActivated(IScheduledTask... initialTasks) {
     for (IScheduledTask task : initialTasks) {
       vars.taskChangedState(TaskStateChange.initialized(task));
@@ -206,6 +217,49 @@ public class TaskVarsTest extends EasyMockTest {
   }
 
   @Test
+  public void testStaticVetoGroup() {
+    expectStatusCountersInitialized();
+    expectStatExport(VETO_STATIC_NAME);
+
+    replayAndBuild();
+    schedulerActivated();
+
+    applyVeto(
+        makeTask(JOB_A, PENDING),
+        Veto.insufficientResources("ram", 500),
+        Veto.insufficientResources("cpu", 500));
+
+    assertEquals(1, getValue(VETO_STATIC_NAME));
+  }
+
+  @Test
+  public void testDynamicVetoGroup() {
+    expectStatusCountersInitialized();
+    expectStatExport(VETO_DYNAMIC_NAME);
+
+    replayAndBuild();
+    schedulerActivated();
+
+    applyVeto(makeTask(JOB_A, PENDING), Veto.unsatisfiedLimit("constraint"));
+    assertEquals(1, getValue(VETO_DYNAMIC_NAME));
+  }
+
+  @Test
+  public void testMixedVetoGroup() {
+    expectStatusCountersInitialized();
+    expectStatExport(VETO_MIXED_NAME);
+
+    replayAndBuild();
+    schedulerActivated();
+
+    applyVeto(makeTask(JOB_A, PENDING),
+        Veto.unsatisfiedLimit("constraint"),
+        Veto.insufficientResources("ram", 500));
+
+    assertEquals(1, getValue(VETO_MIXED_NAME));
+  }
+
+  @Test
   public void testLoadsFromStorage() {
     expectStatusCountersInitialized();
     expectGetHostRack("hostA", "rackA").atLeastOnce();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ada97bdb/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
index d408ec0..0b41156 100644
--- a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
@@ -51,8 +51,8 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest {
       IHostAttributes.build(new HostAttributes().setHost("host").setMode(MaintenanceMode.NONE)));
   private ResourceRequest request;
 
-  private static final Veto VETO_1 = new Veto("veto1", 1);
-  private static final Veto VETO_2 = new Veto("veto2", 2);
+  private static final Veto VETO_1 = Veto.insufficientResources("ram", 1);
+  private static final Veto VETO_2 = Veto.insufficientResources("ram", 2);
 
   private SchedulingFilter filter;
   private EventSink eventSink;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ada97bdb/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
index d1a7066..b60dd9c 100644
--- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
@@ -56,8 +56,6 @@ import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE;
-import static org.apache.aurora.scheduler.filter.ConstraintMatcher.limitVeto;
-import static org.apache.aurora.scheduler.filter.ConstraintMatcher.mismatchVeto;
 import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.DEDICATED_HOST_VETO;
 import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.CPU;
 import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.DISK;
@@ -270,9 +268,10 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   public void testHostDrainingForMaintenance() {
     control.replay();
 
-    assertVetoes(makeTask(),
+    assertVetoes(
+        makeTask(),
         hostAttributes(HOST_A, MaintenanceMode.DRAINING, host(HOST_A), rack(RACK_A)),
-        ConstraintMatcher.maintenanceVeto("draining"));
+        Veto.maintenance("draining"));
   }
 
   @Test
@@ -282,7 +281,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     assertVetoes(
         makeTask(),
         hostAttributes(HOST_A, MaintenanceMode.DRAINED, host(HOST_A), rack(RACK_A)),
-        ConstraintMatcher.maintenanceVeto("drained"));
+        Veto.maintenance("drained"));
   }
 
   @Test
@@ -295,7 +294,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     assertVetoes(
         makeTask(OWNER_A, JOB_A, constraint1, constraint2),
         hostAttributes(HOST_A, dedicated(HOST_A), host(HOST_A)),
-        mismatchVeto(DEDICATED_ATTRIBUTE));
+        Veto.constraintMismatch(DEDICATED_ATTRIBUTE));
     assertNoVetoes(
         makeTask(OWNER_B, JOB_B, constraint1, constraint2),
         hostAttributes(HOST_B, dedicated("xxx"), host(HOST_A)));
@@ -312,11 +311,11 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     assertVetoes(
         makeTask(OWNER_A, JOB_A, hostLimit, makeConstraint(DEDICATED_ATTRIBUTE, "xxx")),
         hostAttributes(HOST_A, host(HOST_A)),
-        mismatchVeto(DEDICATED_ATTRIBUTE));
+        Veto.constraintMismatch(DEDICATED_ATTRIBUTE));
     assertVetoes(
         makeTask(OWNER_B, JOB_A, hostLimit, makeConstraint(DEDICATED_ATTRIBUTE, "xxx")),
         hostAttributes(HOST_B, dedicated(OWNER_B.getRole() + "/" + JOB_B), host(HOST_B)),
-        mismatchVeto(DEDICATED_ATTRIBUTE));
+        Veto.constraintMismatch(DEDICATED_ATTRIBUTE));
   }
 
   @Test
@@ -366,24 +365,24 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         hostLimitTask(OWNER_A, JOB_A, 1),
         hostB,
         stateA,
-        limitVeto(HOST_ATTRIBUTE));
+        Veto.unsatisfiedLimit(HOST_ATTRIBUTE));
     assertVetoes(
         hostLimitTask(OWNER_A, JOB_A, 2),
         hostB,
         stateA,
-        limitVeto(HOST_ATTRIBUTE));
+        Veto.unsatisfiedLimit(HOST_ATTRIBUTE));
     assertNoVetoes(hostLimitTask(OWNER_A, JOB_A, 3), hostB, stateA);
 
     assertVetoes(
         rackLimitTask(OWNER_B, JOB_A, 2),
         hostB,
         stateB,
-        limitVeto(RACK_ATTRIBUTE));
+        Veto.unsatisfiedLimit(RACK_ATTRIBUTE));
     assertVetoes(
         rackLimitTask(OWNER_B, JOB_A, 3),
         hostB,
         stateB,
-        limitVeto(RACK_ATTRIBUTE));
+        Veto.unsatisfiedLimit(RACK_ATTRIBUTE));
     assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 4), hostB, stateB);
 
     assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 1), hostC, stateB);
@@ -392,7 +391,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         rackLimitTask(OWNER_A, JOB_A, 1),
         hostC,
         stateA,
-        limitVeto(RACK_ATTRIBUTE));
+        Veto.unsatisfiedLimit(RACK_ATTRIBUTE));
     assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 2), hostC, stateB);
   }
 
@@ -460,7 +459,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     assertVetoes(
         makeTask(OWNER_A, JOB_A, jvmNegated, zoneNegated),
         hostA,
-        mismatchVeto("jvm"));
+        Veto.constraintMismatch("jvm"));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ada97bdb/src/test/java/org/apache/aurora/scheduler/metadata/NearestFitTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/metadata/NearestFitTest.java b/src/test/java/org/apache/aurora/scheduler/metadata/NearestFitTest.java
index b60b004..78a236c 100644
--- a/src/test/java/org/apache/aurora/scheduler/metadata/NearestFitTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/metadata/NearestFitTest.java
@@ -35,9 +35,9 @@ import static org.junit.Assert.assertEquals;
 
 public class NearestFitTest {
 
-  private static final Veto ALMOST = new Veto("Almost", 1);
-  private static final Veto NOPE = new Veto("Nope", 5);
-  private static final Veto NO_CHANCE = new Veto("No chance", 1000);
+  private static final Veto ALMOST = Veto.insufficientResources("Almost", 1);
+  private static final Veto NOPE = Veto.insufficientResources("Nope", 5);
+  private static final Veto NO_CHANCE = Veto.insufficientResources("No chance", 1000);
   private static final Veto KERNEL = Veto.constraintMismatch("2.6.39");
 
   private static final String TASK = "taskId";

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ada97bdb/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index 168290a..de5f21a 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -2012,7 +2012,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
 
     Set<PendingReason> expected = ImmutableSet.of(new PendingReason()
         .setTaskId(taskId)
-        .setReason("first,second"));
+        .setReason("Constraint not satisfied: first,Constraint not satisfied: second"));
 
     Response response = assertOkResponse(thrift.getPendingReason(query.get()));
     assertEquals(expected, response.getResult().getGetPendingReasonResult().getReasons());


Mime
View raw message