aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject incubator-aurora git commit: Expose more details about the tasks the preemptor is working for.
Date Fri, 27 Feb 2015 20:09:13 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master d1a129659 -> 443f66867


Expose more details about the tasks the preemptor is working for.

Bugs closed: AURORA-524

Reviewed at https://reviews.apache.org/r/31445/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/443f6686
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/443f6686
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/443f6686

Branch: refs/heads/master
Commit: 443f66867979ac0eb14943fce6d1c5109f714212
Parents: d1a1296
Author: Bill Farner <wfarner@apache.org>
Authored: Fri Feb 27 12:05:11 2015 -0800
Committer: Bill Farner <wfarner@apache.org>
Committed: Fri Feb 27 12:08:14 2015 -0800

----------------------------------------------------------------------
 .../async/preemptor/PreemptorImpl.java          | 107 ++++++++++++++-----
 .../async/preemptor/PreemptorImplTest.java      |  98 ++++++++++++++++-
 .../scheduler/testing/FakeStatsProvider.java    |  10 ++
 3 files changed, 185 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/443f6686/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java
index a4e8dd3..833a3e0 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java
@@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.async.preemptor;
 
 import java.lang.annotation.Retention;
 import java.lang.annotation.Target;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -29,13 +28,12 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
 import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.Stats;
 import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.util.Clock;
 
@@ -51,6 +49,7 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.mesos.ExecutorSettings;
 import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.stats.CachedCounters;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
@@ -86,11 +85,71 @@ public class PreemptorImpl implements Preemptor {
   @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
   public @interface PreemptionDelay { }
 
-  private final AtomicLong tasksPreempted = Stats.exportLong("preemptor_tasks_preempted");
-  // Incremented every time the preemptor is invoked and finds tasks pending and preemptable
tasks
-  private final AtomicLong attemptedPreemptions = Stats.exportLong("preemptor_attempts");
-  // Incremented every time we fail to find tasks to preempt for a pending task.
-  private final AtomicLong noSlotsFound = Stats.exportLong("preemptor_no_slots_found");
+  @VisibleForTesting
+  static class Metrics {
+    private volatile boolean exported = false;
+    private final CachedCounters counters;
+
+    Metrics(CachedCounters counters) {
+      this.counters = requireNonNull(counters);
+    }
+
+    private static String name(boolean production) {
+      return production ? "prod" : "non_prod";
+    }
+
+    private void assertFullyExported() {
+      if (exported) {
+        return;
+      }
+
+      // Dummy-read all stats to ensure they are exported.
+      Set<String> allStats = ImmutableSet.of(
+          attemptsStatName(false),
+          attemptsStatName(true),
+          successStatName(false),
+          successStatName(true),
+          failureStatName(false),
+          failureStatName(true));
+      for (String stat : allStats) {
+        counters.get(stat);
+      }
+
+      exported = true;
+    }
+
+    private void increment(String stat) {
+      assertFullyExported();
+      counters.get(stat).incrementAndGet();
+    }
+
+    @VisibleForTesting
+    static String attemptsStatName(boolean production) {
+      return "preemptor_attempts_for_" + name(production);
+    }
+
+    void recordPreemptionAttemptFor(ITaskConfig task) {
+      increment(attemptsStatName(task.isProduction()));
+    }
+
+    @VisibleForTesting
+    static String successStatName(boolean production) {
+      return "preemptor_tasks_preempted_" + name(production);
+    }
+
+    void recordTaskPreemption(PreemptionVictim victim) {
+      increment(successStatName(victim.isProduction()));
+    }
+
+    @VisibleForTesting
+    static String failureStatName(boolean production) {
+      return "preemptor_no_slots_found_for_" + name(production);
+    }
+
+    void recordPreemptionFailure(ITaskConfig task) {
+      increment(failureStatName(task.isProduction()));
+    }
+  }
 
   private final Predicate<IScheduledTask> isIdleTask = new Predicate<IScheduledTask>()
{
     @Override
@@ -109,6 +168,7 @@ public class PreemptorImpl implements Preemptor {
   private final AtomicLong missingAttributes;
   private final ClusterState clusterState;
   private final ExecutorSettings executorSettings;
+  private final Metrics metrics;
 
   /**
    * Creates a new preemptor.
@@ -142,6 +202,7 @@ public class PreemptorImpl implements Preemptor {
     missingAttributes = statsProvider.makeCounter("preemptor_missing_attributes");
     this.clusterState = requireNonNull(clusterState);
     this.executorSettings = requireNonNull(executorSettings);
+    this.metrics = new Metrics(new CachedCounters(statsProvider));
   }
 
   private static final Function<HostOffer, ResourceSlot> OFFER_TO_RESOURCE_SLOT =
@@ -176,14 +237,6 @@ public class PreemptorImpl implements Preemptor {
         }
       };
 
-  private static final Function<PreemptionVictim, String> VICTIM_TO_TASK_ID =
-      new Function<PreemptionVictim, String>() {
-        @Override
-        public String apply(PreemptionVictim victim) {
-          return victim.getTaskId();
-        }
-      };
-
   // TODO(zmanji) Consider using Dominant Resource Fairness for ordering instead of the vector
   // ordering
   private final Ordering<PreemptionVictim> resourceOrder =
@@ -202,7 +255,7 @@ public class PreemptorImpl implements Preemptor {
    * The empty set indicates the offers (slack) are enough.
    * A set with elements indicates those tasks and the offers are enough.
    */
-  private Optional<Set<String>> getTasksToPreempt(
+  private Optional<Set<PreemptionVictim>> getTasksToPreempt(
       Iterable<PreemptionVictim> possibleVictims,
       Iterable<HostOffer> offers,
       IAssignedTask pendingTask,
@@ -232,7 +285,7 @@ public class PreemptorImpl implements Preemptor {
           new ResourceRequest(pendingTask.getTask(), pendingTask.getTaskId(), jobState));
 
       if (vetoes.isEmpty()) {
-        return Optional.<Set<String>>of(ImmutableSet.<String>of());
+        return Optional.<Set<PreemptionVictim>>of(ImmutableSet.<PreemptionVictim>of());
       }
     }
 
@@ -243,7 +296,7 @@ public class PreemptorImpl implements Preemptor {
       return Optional.absent();
     }
 
-    List<PreemptionVictim> toPreemptTasks = Lists.newArrayList();
+    Set<PreemptionVictim> toPreemptTasks = Sets.newHashSet();
 
     Iterable<PreemptionVictim> sortedVictims = resourceOrder.immutableSortedCopy(preemptableTasks);
 
@@ -265,9 +318,7 @@ public class PreemptorImpl implements Preemptor {
           new ResourceRequest(pendingTask.getTask(), pendingTask.getTaskId(), jobState));
 
       if (vetoes.isEmpty()) {
-        Set<String> taskIds =
-            FluentIterable.from(toPreemptTasks).transform(VICTIM_TO_TASK_ID).toSet();
-        return Optional.of(taskIds);
+        return Optional.<Set<PreemptionVictim>>of(ImmutableSet.copyOf(toPreemptTasks));
       }
     }
     return Optional.absent();
@@ -317,7 +368,7 @@ public class PreemptorImpl implements Preemptor {
       return Optional.absent();
     }
 
-    attemptedPreemptions.incrementAndGet();
+    metrics.recordPreemptionAttemptFor(pendingTask.get().getTask());
 
     // Group the offers by slave id so they can be paired with active tasks from the same
slave.
     Multimap<String, HostOffer> slavesToOffers =
@@ -329,7 +380,7 @@ public class PreemptorImpl implements Preemptor {
         .build();
 
     for (String slaveID : allSlaves) {
-      final Optional<Set<String>> toPreemptTasks = getTasksToPreempt(
+      final Optional<Set<PreemptionVictim>> toPreemptTasks = getTasksToPreempt(
           slavesToActiveTasks.get(slaveID),
           slavesToOffers.get(slaveID),
           pendingTask.get(),
@@ -339,14 +390,14 @@ public class PreemptorImpl implements Preemptor {
         storage.write(new Storage.MutateWork.NoResult.Quiet() {
           @Override
           protected void execute(Storage.MutableStoreProvider storeProvider) {
-            for (String toPreempt : toPreemptTasks.get()) {
+            for (PreemptionVictim toPreempt : toPreemptTasks.get()) {
+              metrics.recordTaskPreemption(toPreempt);
               stateManager.changeState(
                   storeProvider,
-                  toPreempt,
+                  toPreempt.getTaskId(),
                   Optional.<ScheduleStatus>absent(),
                   PREEMPTING,
                   Optional.of("Preempting in favor of " + taskId));
-              tasksPreempted.incrementAndGet();
             }
           }
         });
@@ -354,7 +405,7 @@ public class PreemptorImpl implements Preemptor {
       }
     }
 
-    noSlotsFound.incrementAndGet();
+    metrics.recordPreemptionFailure(pendingTask.get().getTask());
     return Optional.absent();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/443f6686/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
index 44cd8f7..2845b3f 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
@@ -30,7 +30,6 @@ import com.google.common.collect.Multimaps;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Data;
 import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.testing.easymock.EasyMockTest;
 import com.twitter.common.util.testing.FakeClock;
 
@@ -72,11 +71,15 @@ import org.junit.Test;
 import static org.apache.aurora.gen.MaintenanceMode.NONE;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.scheduler.async.preemptor.PreemptorImpl.Metrics.attemptsStatName;
+import static org.apache.aurora.scheduler.async.preemptor.PreemptorImpl.Metrics.failureStatName;
+import static org.apache.aurora.scheduler.async.preemptor.PreemptorImpl.Metrics.successStatName;
 import static org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import static org.apache.mesos.Protos.Offer;
 import static org.apache.mesos.Protos.Resource;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
 
 public class PreemptorImplTest extends EasyMockTest {
 
@@ -102,7 +105,7 @@ public class PreemptorImplTest extends EasyMockTest {
   private StateManager stateManager;
   private SchedulingFilter schedulingFilter;
   private FakeClock clock;
-  private StatsProvider statsProvider;
+  private FakeStatsProvider statsProvider;
   private ClusterState clusterState;
   private OfferManager offerManager;
   private AttributeAggregate emptyJob;
@@ -190,6 +193,13 @@ public class PreemptorImplTest extends EasyMockTest {
 
     control.replay();
     runPreemptor(highPriority);
+
+    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true)));
+    assertEquals(1L, statsProvider.getLongValue(successStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(successStatName(true)));
+    assertEquals(0L, statsProvider.getLongValue(failureStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(failureStatName(true)));
   }
 
   @Test
@@ -216,6 +226,13 @@ public class PreemptorImplTest extends EasyMockTest {
 
     control.replay();
     runPreemptor(highPriority);
+
+    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true)));
+    assertEquals(1L, statsProvider.getLongValue(successStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(successStatName(true)));
+    assertEquals(0L, statsProvider.getLongValue(failureStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(failureStatName(true)));
   }
 
   @Test
@@ -245,6 +262,13 @@ public class PreemptorImplTest extends EasyMockTest {
 
     control.replay();
     runPreemptor(pendingPriority);
+
+    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true)));
+    assertEquals(1L, statsProvider.getLongValue(successStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(successStatName(true)));
+    assertEquals(0L, statsProvider.getLongValue(failureStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(failureStatName(true)));
   }
 
   @Test
@@ -263,6 +287,13 @@ public class PreemptorImplTest extends EasyMockTest {
 
     control.replay();
     runPreemptor(task);
+
+    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true)));
+    assertEquals(0L, statsProvider.getLongValue(successStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(successStatName(true)));
+    assertEquals(1L, statsProvider.getLongValue(failureStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(failureStatName(true)));
   }
 
   @Test
@@ -287,6 +318,13 @@ public class PreemptorImplTest extends EasyMockTest {
 
     control.replay();
     runPreemptor(p1);
+
+    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false)));
+    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true)));
+    assertEquals(1L, statsProvider.getLongValue(successStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(successStatName(true)));
+    assertEquals(0L, statsProvider.getLongValue(failureStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(failureStatName(true)));
   }
 
   @Test
@@ -311,6 +349,13 @@ public class PreemptorImplTest extends EasyMockTest {
 
     control.replay();
     runPreemptor(p1);
+
+    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false)));
+    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true)));
+    assertEquals(1L, statsProvider.getLongValue(successStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(successStatName(true)));
+    assertEquals(0L, statsProvider.getLongValue(failureStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(failureStatName(true)));
   }
 
   @Test
@@ -329,6 +374,13 @@ public class PreemptorImplTest extends EasyMockTest {
 
     control.replay();
     runPreemptor(p1);
+
+    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false)));
+    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true)));
+    assertEquals(0L, statsProvider.getLongValue(successStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(successStatName(true)));
+    assertEquals(0L, statsProvider.getLongValue(failureStatName(false)));
+    assertEquals(1L, statsProvider.getLongValue(failureStatName(true)));
   }
 
   // Ensures a production task can preempt 2 tasks on the same host.
@@ -361,6 +413,13 @@ public class PreemptorImplTest extends EasyMockTest {
 
     control.replay();
     runPreemptor(p1);
+
+    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false)));
+    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true)));
+    assertEquals(2L, statsProvider.getLongValue(successStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(successStatName(true)));
+    assertEquals(0L, statsProvider.getLongValue(failureStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(failureStatName(true)));
   }
 
   // Ensures we select the minimal number of tasks to preempt
@@ -396,6 +455,13 @@ public class PreemptorImplTest extends EasyMockTest {
 
     control.replay();
     runPreemptor(p1);
+
+    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false)));
+    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true)));
+    assertEquals(1L, statsProvider.getLongValue(successStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(successStatName(true)));
+    assertEquals(0L, statsProvider.getLongValue(failureStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(failureStatName(true)));
   }
 
   // Ensures a production task *never* preempts a production task from another job.
@@ -421,6 +487,13 @@ public class PreemptorImplTest extends EasyMockTest {
 
     control.replay();
     runPreemptor(p2);
+
+    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false)));
+    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true)));
+    assertEquals(0L, statsProvider.getLongValue(successStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(successStatName(true)));
+    assertEquals(0L, statsProvider.getLongValue(failureStatName(false)));
+    assertEquals(1L, statsProvider.getLongValue(failureStatName(true)));
   }
 
   // Ensures that we can preempt if a task + offer can satisfy a pending task.
@@ -449,6 +522,13 @@ public class PreemptorImplTest extends EasyMockTest {
 
     control.replay();
     runPreemptor(p1);
+
+    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false)));
+    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true)));
+    assertEquals(1L, statsProvider.getLongValue(successStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(successStatName(true)));
+    assertEquals(0L, statsProvider.getLongValue(failureStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(failureStatName(true)));
   }
 
   // Ensures we can preempt if two tasks and an offer can satisfy a pending task.
@@ -482,6 +562,13 @@ public class PreemptorImplTest extends EasyMockTest {
 
     control.replay();
     runPreemptor(p1);
+
+    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false)));
+    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true)));
+    assertEquals(2L, statsProvider.getLongValue(successStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(successStatName(true)));
+    assertEquals(0L, statsProvider.getLongValue(failureStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(failureStatName(true)));
   }
 
   // Ensures we don't preempt if a host has enough slack to satisfy a pending task.
@@ -508,6 +595,13 @@ public class PreemptorImplTest extends EasyMockTest {
 
     control.replay();
     runPreemptor(p1);
+
+    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false)));
+    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true)));
+    assertEquals(0L, statsProvider.getLongValue(successStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(successStatName(true)));
+    assertEquals(0L, statsProvider.getLongValue(failureStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(failureStatName(true)));
   }
 
   // TODO(zmanji) spread tasks across slave ids on the same host and see if preemption fails.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/443f6686/src/test/java/org/apache/aurora/scheduler/testing/FakeStatsProvider.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/testing/FakeStatsProvider.java b/src/test/java/org/apache/aurora/scheduler/testing/FakeStatsProvider.java
index 768e784..8c0a5e6 100644
--- a/src/test/java/org/apache/aurora/scheduler/testing/FakeStatsProvider.java
+++ b/src/test/java/org/apache/aurora/scheduler/testing/FakeStatsProvider.java
@@ -55,6 +55,16 @@ public class FakeStatsProvider implements StatsProvider {
       }));
   }
 
+  /**
+   * Gets the value of a stat as a long.
+   *
+   * @param name Stat name.
+   * @return Value, as a long.
+   */
+  public long getLongValue(String name) {
+    return stats.get(name).get().longValue();
+  }
+
   @Override
   public AtomicLong makeCounter(String name) {
     final AtomicLong counter = new AtomicLong();


Mime
View raw message