Return-Path: X-Original-To: apmail-aurora-commits-archive@minotaur.apache.org Delivered-To: apmail-aurora-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CEFE01142A for ; Fri, 5 Sep 2014 22:18:19 +0000 (UTC) Received: (qmail 60204 invoked by uid 500); 5 Sep 2014 22:18:19 -0000 Delivered-To: apmail-aurora-commits-archive@aurora.apache.org Received: (qmail 60172 invoked by uid 500); 5 Sep 2014 22:18:19 -0000 Mailing-List: contact commits-help@aurora.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aurora.incubator.apache.org Delivered-To: mailing list commits@aurora.incubator.apache.org Received: (qmail 60160 invoked by uid 99); 5 Sep 2014 22:18:19 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Sep 2014 22:18:19 +0000 X-ASF-Spam-Status: No, hits=-2001.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 05 Sep 2014 22:17:55 +0000 Received: (qmail 60094 invoked by uid 99); 5 Sep 2014 22:17:52 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Sep 2014 22:17:52 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id A21EDA0B730; Fri, 5 Sep 2014 22:17:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: maxim@apache.org To: commits@aurora.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: Adding support for per-job task status metrics. Date: Fri, 5 Sep 2014 22:17:52 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-aurora Updated Branches: refs/heads/master 61e0bc667 -> 019ca28f3 Adding support for per-job task status metrics. Bugs closed: AURORA-685 Reviewed at https://reviews.apache.org/r/25357/ Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/019ca28f Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/019ca28f Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/019ca28f Branch: refs/heads/master Commit: 019ca28f363e6f64a02a534f8ddcbfad8f891380 Parents: 61e0bc6 Author: Maxim Khutornenko Authored: Fri Sep 5 15:17:29 2014 -0700 Committer: Maxim Khutornenko Committed: Fri Sep 5 15:17:29 2014 -0700 ---------------------------------------------------------------------- .../org/apache/aurora/scheduler/TaskVars.java | 66 +++++++++++++++----- .../apache/aurora/scheduler/TaskVarsTest.java | 62 ++++++++++++------ 2 files changed, 94 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/019ca28f/src/main/java/org/apache/aurora/scheduler/TaskVars.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/TaskVars.java b/src/main/java/org/apache/aurora/scheduler/TaskVars.java index 6654c16..f1ab934 100644 --- a/src/main/java/org/apache/aurora/scheduler/TaskVars.java +++ b/src/main/java/org/apache/aurora/scheduler/TaskVars.java @@ -29,11 +29,13 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.eventbus.Subscribe; import com.twitter.common.stats.StatsProvider; import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.scheduler.base.JobKeys; import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive; import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; @@ -52,8 +54,11 @@ import static java.util.Objects.requireNonNull; */ class TaskVars implements EventSubscriber { private static final Logger LOG = Logger.getLogger(TaskVars.class.getName()); + private static final ImmutableSet TRACKED_JOB_STATES = + ImmutableSet.of(ScheduleStatus.LOST, ScheduleStatus.FAILED); private final LoadingCache counters; + private final LoadingCache untrackedCounters; private final Storage storage; private volatile boolean exporting = false; @@ -61,10 +66,15 @@ class TaskVars implements EventSubscriber { TaskVars(Storage storage, final StatsProvider statProvider) { this.storage = requireNonNull(storage); requireNonNull(statProvider); - counters = CacheBuilder.newBuilder().build(new CacheLoader() { + counters = buildCache(statProvider); + untrackedCounters = buildCache(statProvider.untracked()); + } + + private LoadingCache buildCache(final StatsProvider provider) { + return CacheBuilder.newBuilder().build(new CacheLoader() { @Override public Counter load(String statName) { - Counter counter = new Counter(statProvider); + Counter counter = new Counter(provider); if (exporting) { counter.exportAs(statName); } @@ -83,6 +93,14 @@ class TaskVars implements EventSubscriber { return "tasks_lost_rack_" + rack; } + @VisibleForTesting + static String jobStatName(IScheduledTask task, ScheduleStatus status) { + return String.format( + "tasks_%s_%s", + status, + JobKeys.canonicalString(JobKeys.from(task.getAssignedTask().getTask()))); + } + private static final Predicate IS_RACK = new Predicate() { @Override public boolean apply(IAttribute attr) { @@ -110,13 +128,10 @@ class TaskVars implements EventSubscriber { getCounter(status).decrement(); } - @Subscribe - public void taskChangedState(TaskStateChange stateChange) { - IScheduledTask task = stateChange.getTask(); - Optional previousState = stateChange.getOldState(); - final String host = stateChange.getTask().getAssignedTask().getSlaveHost(); + private void updateRackCounters(IScheduledTask task, ScheduleStatus newState) { + final String host = task.getAssignedTask().getSlaveHost(); Optional rack; - if (Strings.isNullOrEmpty(stateChange.getTask().getAssignedTask().getSlaveHost())) { + if (Strings.isNullOrEmpty(task.getAssignedTask().getSlaveHost())) { rack = Optional.absent(); } else { rack = storage.consistentRead(new Work.Quiet>() { @@ -136,13 +151,7 @@ class TaskVars implements EventSubscriber { counters.getUnchecked(rackStatName(rack.get())); } - if (stateChange.isTransition() && !previousState.equals(Optional.of(ScheduleStatus.INIT))) { - decrementCount(previousState.get()); - } - - incrementCount(task.getStatus()); - - if (stateChange.getNewState() == ScheduleStatus.LOST) { + if (newState == ScheduleStatus.LOST) { if (rack.isPresent()) { counters.getUnchecked(rackStatName(rack.get())).increment(); } else { @@ -151,6 +160,26 @@ class TaskVars implements EventSubscriber { } } + private void updateJobCounters(IScheduledTask task, ScheduleStatus newState) { + if (TRACKED_JOB_STATES.contains(newState)) { + untrackedCounters.getUnchecked(jobStatName(task, newState)).increment(); + } + } + + @Subscribe + public void taskChangedState(TaskStateChange stateChange) { + IScheduledTask task = stateChange.getTask(); + Optional previousState = stateChange.getOldState(); + + if (stateChange.isTransition() && !previousState.equals(Optional.of(ScheduleStatus.INIT))) { + decrementCount(previousState.get()); + } + incrementCount(task.getStatus()); + + updateRackCounters(task, task.getStatus()); + updateJobCounters(task, task.getStatus()); + } + @Subscribe public void schedulerActive(SchedulerActive event) { // Dummy read the counter for each status counter. This is important to guarantee a stat with @@ -160,10 +189,15 @@ class TaskVars implements EventSubscriber { getCounter(status); } + exportCounters(counters.asMap()); + exportCounters(untrackedCounters.asMap()); + } + + private void exportCounters(Map counterMap) { // Initiate export of all counters. This is not done initially to avoid exporting values that // do not represent the entire storage contents. exporting = true; - for (Map.Entry entry : counters.asMap().entrySet()) { + for (Map.Entry entry : counterMap.entrySet()) { entry.getValue().exportAs(entry.getKey()); } } http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/019ca28f/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java b/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java index d02714c..371ae87 100644 --- a/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java +++ b/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java @@ -50,6 +50,7 @@ import static org.apache.aurora.gen.ScheduleStatus.INIT; import static org.apache.aurora.gen.ScheduleStatus.LOST; import static org.apache.aurora.gen.ScheduleStatus.PENDING; import static org.apache.aurora.gen.ScheduleStatus.RUNNING; +import static org.apache.aurora.scheduler.TaskVars.jobStatName; import static org.apache.aurora.scheduler.TaskVars.rackStatName; import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertEquals; @@ -61,24 +62,35 @@ public class TaskVarsTest extends EasyMockTest { private static final String JOB_A = "job_a"; private static final String JOB_B = "job_b"; private static final String TASK_ID = "task_id"; + private static final String ENV = "test"; private StorageTestUtil storageUtil; - private StatsProvider trackedStats; + private StatsProvider trackedProvider; + private StatsProvider untrackedProvider; private TaskVars vars; private Map> globalCounters; @Before public void setUp() { storageUtil = new StorageTestUtil(this); - trackedStats = createMock(StatsProvider.class); - vars = new TaskVars(storageUtil.storage, trackedStats); - + trackedProvider = createMock(StatsProvider.class); + untrackedProvider = createMock(StatsProvider.class); + expect(trackedProvider.untracked()).andReturn(untrackedProvider); storageUtil.expectOperations(); globalCounters = Maps.newHashMap(); } - private void expectStatExport(final String name) { - expect(trackedStats.makeGauge(EasyMock.eq(name), EasyMock.>anyObject())) + private void replayAndBuild() { + control.replay(); + vars = new TaskVars(storageUtil.storage, trackedProvider); + } + + private void expectStatExport(String name) { + expectStatExport(name, trackedProvider); + } + + private void expectStatExport(final String name, StatsProvider provider) { + expect(provider.makeGauge(EasyMock.eq(name), EasyMock.>anyObject())) .andAnswer(new IAnswer>() { @SuppressWarnings("unchecked") @Override @@ -116,6 +128,7 @@ public class TaskVarsTest extends EasyMockTest { .setTaskId(TASK_ID) .setTask(new TaskConfig() .setJobName(job) + .setEnvironment(ENV) .setOwner(new Identity(ROLE_A, ROLE_A + "-user")))); if (Tasks.SLAVE_ASSIGNED_STATES.contains(status) || Tasks.isTerminated(status)) { task.getAssignedTask().setSlaveHost(host); @@ -138,7 +151,7 @@ public class TaskVarsTest extends EasyMockTest { public void testStartsAtZero() { expectStatusCountersInitialized(); - control.replay(); + replayAndBuild(); schedulerActivated(); assertAllZero(); @@ -146,10 +159,9 @@ public class TaskVarsTest extends EasyMockTest { @Test public void testNoEarlyExport() { - control.replay(); + replayAndBuild(); // No variables should be exported since schedulerActive is never called. - vars = new TaskVars(storageUtil.storage, trackedStats); IScheduledTask taskA = makeTask(JOB_A, INIT); changeState(taskA, PENDING); changeState(IScheduledTask.build(taskA.newBuilder().setStatus(PENDING)), ASSIGNED); @@ -171,7 +183,7 @@ public class TaskVarsTest extends EasyMockTest { expectGetHostRack("hostA", "rackA").atLeastOnce(); expectStatExport(rackStatName("rackA")); - control.replay(); + replayAndBuild(); schedulerActivated(); changeState(makeTask(JOB_A, INIT), PENDING); @@ -200,13 +212,16 @@ public class TaskVarsTest extends EasyMockTest { expectStatExport(rackStatName("rackA")); expectStatExport(rackStatName("rackB")); - control.replay(); + IScheduledTask failedTask = makeTask(JOB_B, FAILED, "hostB"); + expectStatExport(jobStatName(failedTask, FAILED), untrackedProvider); + + replayAndBuild(); schedulerActivated( makeTask(JOB_A, PENDING), makeTask(JOB_A, RUNNING, "hostA"), makeTask(JOB_A, FINISHED, "hostA"), makeTask(JOB_B, PENDING), - makeTask(JOB_B, FAILED, "hostB")); + failedTask); assertEquals(2, getValue(PENDING)); assertEquals(1, getValue(RUNNING)); @@ -214,6 +229,7 @@ public class TaskVarsTest extends EasyMockTest { assertEquals(1, getValue(FAILED)); assertEquals(0, getValue(rackStatName("rackA"))); assertEquals(0, getValue(rackStatName("rackB"))); + assertEquals(1, getValue(jobStatName(failedTask, FAILED))); } private IExpectationSetters expectGetHostRack(String host, String rackToReturn) { @@ -235,14 +251,18 @@ public class TaskVarsTest extends EasyMockTest { expectStatExport(rackStatName("rackA")); expectStatExport(rackStatName("rackB")); - control.replay(); - schedulerActivated(); - IScheduledTask a = makeTask("jobA", RUNNING, "host1"); IScheduledTask b = makeTask("jobB", RUNNING, "host2"); - IScheduledTask c = makeTask("jobC", RUNNING, "host3"); + IScheduledTask c = makeTask("jobD", RUNNING, "host3"); IScheduledTask d = makeTask("jobD", RUNNING, "host1"); + expectStatExport(jobStatName(a, LOST), untrackedProvider); + expectStatExport(jobStatName(b, LOST), untrackedProvider); + expectStatExport(jobStatName(c, LOST), untrackedProvider); + + replayAndBuild(); + schedulerActivated(); + changeState(a, LOST); changeState(b, LOST); changeState(c, LOST); @@ -250,6 +270,10 @@ public class TaskVarsTest extends EasyMockTest { assertEquals(2, getValue(rackStatName("rackA"))); assertEquals(2, getValue(rackStatName("rackB"))); + + assertEquals(1, getValue(jobStatName(a, LOST))); + assertEquals(1, getValue(jobStatName(b, LOST))); + assertEquals(2, getValue(jobStatName(c, LOST))); } @Test @@ -258,10 +282,12 @@ public class TaskVarsTest extends EasyMockTest { expect(storageUtil.attributeStore.getHostAttributes("a")) .andReturn(Optional.absent()); - control.replay(); + IScheduledTask a = makeTask(JOB_A, RUNNING, "a"); + expectStatExport(jobStatName(a, LOST), untrackedProvider); + + replayAndBuild(); schedulerActivated(); - IScheduledTask a = makeTask(JOB_A, RUNNING, "a"); changeState(a, LOST); // Since no attributes are stored for the host, a variable is not exported/updated. }