Return-Path: X-Original-To: apmail-aurora-commits-archive@minotaur.apache.org Delivered-To: apmail-aurora-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4F55A10435 for ; Fri, 27 Feb 2015 20:09:36 +0000 (UTC) Received: (qmail 18998 invoked by uid 500); 27 Feb 2015 20:09:36 -0000 Delivered-To: apmail-aurora-commits-archive@aurora.apache.org Received: (qmail 18964 invoked by uid 500); 27 Feb 2015 20:09:36 -0000 Mailing-List: contact commits-help@aurora.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aurora.incubator.apache.org Delivered-To: mailing list commits@aurora.incubator.apache.org Received: (qmail 18955 invoked by uid 99); 27 Feb 2015 20:09:36 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 27 Feb 2015 20:09:36 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 27 Feb 2015 20:09:33 +0000 Received: (qmail 18845 invoked by uid 99); 27 Feb 2015 20:09:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 27 Feb 2015 20:09:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C780EE0531; Fri, 27 Feb 2015 20:09:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wfarner@apache.org To: commits@aurora.incubator.apache.org Message-Id: <5c4059b5100e4de390d4a3024dc03701@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-aurora git commit: Expose more details about the tasks the preemptor is working for. Date: Fri, 27 Feb 2015 20:09:13 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-aurora Updated Branches: refs/heads/master d1a129659 -> 443f66867 Expose more details about the tasks the preemptor is working for. Bugs closed: AURORA-524 Reviewed at https://reviews.apache.org/r/31445/ Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/443f6686 Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/443f6686 Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/443f6686 Branch: refs/heads/master Commit: 443f66867979ac0eb14943fce6d1c5109f714212 Parents: d1a1296 Author: Bill Farner Authored: Fri Feb 27 12:05:11 2015 -0800 Committer: Bill Farner Committed: Fri Feb 27 12:08:14 2015 -0800 ---------------------------------------------------------------------- .../async/preemptor/PreemptorImpl.java | 107 ++++++++++++++----- .../async/preemptor/PreemptorImplTest.java | 98 ++++++++++++++++- .../scheduler/testing/FakeStatsProvider.java | 10 ++ 3 files changed, 185 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/443f6686/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java index a4e8dd3..833a3e0 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java @@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.async.preemptor; import java.lang.annotation.Retention; import java.lang.annotation.Target; -import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -29,13 +28,12 @@ import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Time; -import com.twitter.common.stats.Stats; import com.twitter.common.stats.StatsProvider; import com.twitter.common.util.Clock; @@ -51,6 +49,7 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource; import org.apache.aurora.scheduler.mesos.ExecutorSettings; import org.apache.aurora.scheduler.state.StateManager; +import org.apache.aurora.scheduler.stats.CachedCounters; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; @@ -86,11 +85,71 @@ public class PreemptorImpl implements Preemptor { @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) public @interface PreemptionDelay { } - private final AtomicLong tasksPreempted = Stats.exportLong("preemptor_tasks_preempted"); - // Incremented every time the preemptor is invoked and finds tasks pending and preemptable tasks - private final AtomicLong attemptedPreemptions = Stats.exportLong("preemptor_attempts"); - // Incremented every time we fail to find tasks to preempt for a pending task. - private final AtomicLong noSlotsFound = Stats.exportLong("preemptor_no_slots_found"); + @VisibleForTesting + static class Metrics { + private volatile boolean exported = false; + private final CachedCounters counters; + + Metrics(CachedCounters counters) { + this.counters = requireNonNull(counters); + } + + private static String name(boolean production) { + return production ? "prod" : "non_prod"; + } + + private void assertFullyExported() { + if (exported) { + return; + } + + // Dummy-read all stats to ensure they are exported. + Set allStats = ImmutableSet.of( + attemptsStatName(false), + attemptsStatName(true), + successStatName(false), + successStatName(true), + failureStatName(false), + failureStatName(true)); + for (String stat : allStats) { + counters.get(stat); + } + + exported = true; + } + + private void increment(String stat) { + assertFullyExported(); + counters.get(stat).incrementAndGet(); + } + + @VisibleForTesting + static String attemptsStatName(boolean production) { + return "preemptor_attempts_for_" + name(production); + } + + void recordPreemptionAttemptFor(ITaskConfig task) { + increment(attemptsStatName(task.isProduction())); + } + + @VisibleForTesting + static String successStatName(boolean production) { + return "preemptor_tasks_preempted_" + name(production); + } + + void recordTaskPreemption(PreemptionVictim victim) { + increment(successStatName(victim.isProduction())); + } + + @VisibleForTesting + static String failureStatName(boolean production) { + return "preemptor_no_slots_found_for_" + name(production); + } + + void recordPreemptionFailure(ITaskConfig task) { + increment(failureStatName(task.isProduction())); + } + } private final Predicate isIdleTask = new Predicate() { @Override @@ -109,6 +168,7 @@ public class PreemptorImpl implements Preemptor { private final AtomicLong missingAttributes; private final ClusterState clusterState; private final ExecutorSettings executorSettings; + private final Metrics metrics; /** * Creates a new preemptor. @@ -142,6 +202,7 @@ public class PreemptorImpl implements Preemptor { missingAttributes = statsProvider.makeCounter("preemptor_missing_attributes"); this.clusterState = requireNonNull(clusterState); this.executorSettings = requireNonNull(executorSettings); + this.metrics = new Metrics(new CachedCounters(statsProvider)); } private static final Function OFFER_TO_RESOURCE_SLOT = @@ -176,14 +237,6 @@ public class PreemptorImpl implements Preemptor { } }; - private static final Function VICTIM_TO_TASK_ID = - new Function() { - @Override - public String apply(PreemptionVictim victim) { - return victim.getTaskId(); - } - }; - // TODO(zmanji) Consider using Dominant Resource Fairness for ordering instead of the vector // ordering private final Ordering resourceOrder = @@ -202,7 +255,7 @@ public class PreemptorImpl implements Preemptor { * The empty set indicates the offers (slack) are enough. * A set with elements indicates those tasks and the offers are enough. */ - private Optional> getTasksToPreempt( + private Optional> getTasksToPreempt( Iterable possibleVictims, Iterable offers, IAssignedTask pendingTask, @@ -232,7 +285,7 @@ public class PreemptorImpl implements Preemptor { new ResourceRequest(pendingTask.getTask(), pendingTask.getTaskId(), jobState)); if (vetoes.isEmpty()) { - return Optional.>of(ImmutableSet.of()); + return Optional.>of(ImmutableSet.of()); } } @@ -243,7 +296,7 @@ public class PreemptorImpl implements Preemptor { return Optional.absent(); } - List toPreemptTasks = Lists.newArrayList(); + Set toPreemptTasks = Sets.newHashSet(); Iterable sortedVictims = resourceOrder.immutableSortedCopy(preemptableTasks); @@ -265,9 +318,7 @@ public class PreemptorImpl implements Preemptor { new ResourceRequest(pendingTask.getTask(), pendingTask.getTaskId(), jobState)); if (vetoes.isEmpty()) { - Set taskIds = - FluentIterable.from(toPreemptTasks).transform(VICTIM_TO_TASK_ID).toSet(); - return Optional.of(taskIds); + return Optional.>of(ImmutableSet.copyOf(toPreemptTasks)); } } return Optional.absent(); @@ -317,7 +368,7 @@ public class PreemptorImpl implements Preemptor { return Optional.absent(); } - attemptedPreemptions.incrementAndGet(); + metrics.recordPreemptionAttemptFor(pendingTask.get().getTask()); // Group the offers by slave id so they can be paired with active tasks from the same slave. Multimap slavesToOffers = @@ -329,7 +380,7 @@ public class PreemptorImpl implements Preemptor { .build(); for (String slaveID : allSlaves) { - final Optional> toPreemptTasks = getTasksToPreempt( + final Optional> toPreemptTasks = getTasksToPreempt( slavesToActiveTasks.get(slaveID), slavesToOffers.get(slaveID), pendingTask.get(), @@ -339,14 +390,14 @@ public class PreemptorImpl implements Preemptor { storage.write(new Storage.MutateWork.NoResult.Quiet() { @Override protected void execute(Storage.MutableStoreProvider storeProvider) { - for (String toPreempt : toPreemptTasks.get()) { + for (PreemptionVictim toPreempt : toPreemptTasks.get()) { + metrics.recordTaskPreemption(toPreempt); stateManager.changeState( storeProvider, - toPreempt, + toPreempt.getTaskId(), Optional.absent(), PREEMPTING, Optional.of("Preempting in favor of " + taskId)); - tasksPreempted.incrementAndGet(); } } }); @@ -354,7 +405,7 @@ public class PreemptorImpl implements Preemptor { } } - noSlotsFound.incrementAndGet(); + metrics.recordPreemptionFailure(pendingTask.get().getTask()); return Optional.absent(); } http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/443f6686/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java index 44cd8f7..2845b3f 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java @@ -30,7 +30,6 @@ import com.google.common.collect.Multimaps; import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Data; import com.twitter.common.quantity.Time; -import com.twitter.common.stats.StatsProvider; import com.twitter.common.testing.easymock.EasyMockTest; import com.twitter.common.util.testing.FakeClock; @@ -72,11 +71,15 @@ import org.junit.Test; import static org.apache.aurora.gen.MaintenanceMode.NONE; import static org.apache.aurora.gen.ScheduleStatus.PENDING; import static org.apache.aurora.gen.ScheduleStatus.RUNNING; +import static org.apache.aurora.scheduler.async.preemptor.PreemptorImpl.Metrics.attemptsStatName; +import static org.apache.aurora.scheduler.async.preemptor.PreemptorImpl.Metrics.failureStatName; +import static org.apache.aurora.scheduler.async.preemptor.PreemptorImpl.Metrics.successStatName; import static org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; import static org.apache.mesos.Protos.Offer; import static org.apache.mesos.Protos.Resource; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; public class PreemptorImplTest extends EasyMockTest { @@ -102,7 +105,7 @@ public class PreemptorImplTest extends EasyMockTest { private StateManager stateManager; private SchedulingFilter schedulingFilter; private FakeClock clock; - private StatsProvider statsProvider; + private FakeStatsProvider statsProvider; private ClusterState clusterState; private OfferManager offerManager; private AttributeAggregate emptyJob; @@ -190,6 +193,13 @@ public class PreemptorImplTest extends EasyMockTest { control.replay(); runPreemptor(highPriority); + + assertEquals(1L, statsProvider.getLongValue(attemptsStatName(false))); + assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true))); + assertEquals(1L, statsProvider.getLongValue(successStatName(false))); + assertEquals(0L, statsProvider.getLongValue(successStatName(true))); + assertEquals(0L, statsProvider.getLongValue(failureStatName(false))); + assertEquals(0L, statsProvider.getLongValue(failureStatName(true))); } @Test @@ -216,6 +226,13 @@ public class PreemptorImplTest extends EasyMockTest { control.replay(); runPreemptor(highPriority); + + assertEquals(1L, statsProvider.getLongValue(attemptsStatName(false))); + assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true))); + assertEquals(1L, statsProvider.getLongValue(successStatName(false))); + assertEquals(0L, statsProvider.getLongValue(successStatName(true))); + assertEquals(0L, statsProvider.getLongValue(failureStatName(false))); + assertEquals(0L, statsProvider.getLongValue(failureStatName(true))); } @Test @@ -245,6 +262,13 @@ public class PreemptorImplTest extends EasyMockTest { control.replay(); runPreemptor(pendingPriority); + + assertEquals(1L, statsProvider.getLongValue(attemptsStatName(false))); + assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true))); + assertEquals(1L, statsProvider.getLongValue(successStatName(false))); + assertEquals(0L, statsProvider.getLongValue(successStatName(true))); + assertEquals(0L, statsProvider.getLongValue(failureStatName(false))); + assertEquals(0L, statsProvider.getLongValue(failureStatName(true))); } @Test @@ -263,6 +287,13 @@ public class PreemptorImplTest extends EasyMockTest { control.replay(); runPreemptor(task); + + assertEquals(1L, statsProvider.getLongValue(attemptsStatName(false))); + assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true))); + assertEquals(0L, statsProvider.getLongValue(successStatName(false))); + assertEquals(0L, statsProvider.getLongValue(successStatName(true))); + assertEquals(1L, statsProvider.getLongValue(failureStatName(false))); + assertEquals(0L, statsProvider.getLongValue(failureStatName(true))); } @Test @@ -287,6 +318,13 @@ public class PreemptorImplTest extends EasyMockTest { control.replay(); runPreemptor(p1); + + assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false))); + assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true))); + assertEquals(1L, statsProvider.getLongValue(successStatName(false))); + assertEquals(0L, statsProvider.getLongValue(successStatName(true))); + assertEquals(0L, statsProvider.getLongValue(failureStatName(false))); + assertEquals(0L, statsProvider.getLongValue(failureStatName(true))); } @Test @@ -311,6 +349,13 @@ public class PreemptorImplTest extends EasyMockTest { control.replay(); runPreemptor(p1); + + assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false))); + assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true))); + assertEquals(1L, statsProvider.getLongValue(successStatName(false))); + assertEquals(0L, statsProvider.getLongValue(successStatName(true))); + assertEquals(0L, statsProvider.getLongValue(failureStatName(false))); + assertEquals(0L, statsProvider.getLongValue(failureStatName(true))); } @Test @@ -329,6 +374,13 @@ public class PreemptorImplTest extends EasyMockTest { control.replay(); runPreemptor(p1); + + assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false))); + assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true))); + assertEquals(0L, statsProvider.getLongValue(successStatName(false))); + assertEquals(0L, statsProvider.getLongValue(successStatName(true))); + assertEquals(0L, statsProvider.getLongValue(failureStatName(false))); + assertEquals(1L, statsProvider.getLongValue(failureStatName(true))); } // Ensures a production task can preempt 2 tasks on the same host. @@ -361,6 +413,13 @@ public class PreemptorImplTest extends EasyMockTest { control.replay(); runPreemptor(p1); + + assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false))); + assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true))); + assertEquals(2L, statsProvider.getLongValue(successStatName(false))); + assertEquals(0L, statsProvider.getLongValue(successStatName(true))); + assertEquals(0L, statsProvider.getLongValue(failureStatName(false))); + assertEquals(0L, statsProvider.getLongValue(failureStatName(true))); } // Ensures we select the minimal number of tasks to preempt @@ -396,6 +455,13 @@ public class PreemptorImplTest extends EasyMockTest { control.replay(); runPreemptor(p1); + + assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false))); + assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true))); + assertEquals(1L, statsProvider.getLongValue(successStatName(false))); + assertEquals(0L, statsProvider.getLongValue(successStatName(true))); + assertEquals(0L, statsProvider.getLongValue(failureStatName(false))); + assertEquals(0L, statsProvider.getLongValue(failureStatName(true))); } // Ensures a production task *never* preempts a production task from another job. @@ -421,6 +487,13 @@ public class PreemptorImplTest extends EasyMockTest { control.replay(); runPreemptor(p2); + + assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false))); + assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true))); + assertEquals(0L, statsProvider.getLongValue(successStatName(false))); + assertEquals(0L, statsProvider.getLongValue(successStatName(true))); + assertEquals(0L, statsProvider.getLongValue(failureStatName(false))); + assertEquals(1L, statsProvider.getLongValue(failureStatName(true))); } // Ensures that we can preempt if a task + offer can satisfy a pending task. @@ -449,6 +522,13 @@ public class PreemptorImplTest extends EasyMockTest { control.replay(); runPreemptor(p1); + + assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false))); + assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true))); + assertEquals(1L, statsProvider.getLongValue(successStatName(false))); + assertEquals(0L, statsProvider.getLongValue(successStatName(true))); + assertEquals(0L, statsProvider.getLongValue(failureStatName(false))); + assertEquals(0L, statsProvider.getLongValue(failureStatName(true))); } // Ensures we can preempt if two tasks and an offer can satisfy a pending task. @@ -482,6 +562,13 @@ public class PreemptorImplTest extends EasyMockTest { control.replay(); runPreemptor(p1); + + assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false))); + assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true))); + assertEquals(2L, statsProvider.getLongValue(successStatName(false))); + assertEquals(0L, statsProvider.getLongValue(successStatName(true))); + assertEquals(0L, statsProvider.getLongValue(failureStatName(false))); + assertEquals(0L, statsProvider.getLongValue(failureStatName(true))); } // Ensures we don't preempt if a host has enough slack to satisfy a pending task. @@ -508,6 +595,13 @@ public class PreemptorImplTest extends EasyMockTest { control.replay(); runPreemptor(p1); + + assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false))); + assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true))); + assertEquals(0L, statsProvider.getLongValue(successStatName(false))); + assertEquals(0L, statsProvider.getLongValue(successStatName(true))); + assertEquals(0L, statsProvider.getLongValue(failureStatName(false))); + assertEquals(0L, statsProvider.getLongValue(failureStatName(true))); } // TODO(zmanji) spread tasks across slave ids on the same host and see if preemption fails. http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/443f6686/src/test/java/org/apache/aurora/scheduler/testing/FakeStatsProvider.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/testing/FakeStatsProvider.java b/src/test/java/org/apache/aurora/scheduler/testing/FakeStatsProvider.java index 768e784..8c0a5e6 100644 --- a/src/test/java/org/apache/aurora/scheduler/testing/FakeStatsProvider.java +++ b/src/test/java/org/apache/aurora/scheduler/testing/FakeStatsProvider.java @@ -55,6 +55,16 @@ public class FakeStatsProvider implements StatsProvider { })); } + /** + * Gets the value of a stat as a long. + * + * @param name Stat name. + * @return Value, as a long. + */ + public long getLongValue(String name) { + return stats.get(name).get().longValue(); + } + @Override public AtomicLong makeCounter(String name) { final AtomicLong counter = new AtomicLong();