aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject git commit: Adding support for per-job task status metrics.
Date Fri, 05 Sep 2014 22:17:52 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 61e0bc667 -> 019ca28f3


Adding support for per-job task status metrics.

Bugs closed: AURORA-685

Reviewed at https://reviews.apache.org/r/25357/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/019ca28f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/019ca28f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/019ca28f

Branch: refs/heads/master
Commit: 019ca28f363e6f64a02a534f8ddcbfad8f891380
Parents: 61e0bc6
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Fri Sep 5 15:17:29 2014 -0700
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Fri Sep 5 15:17:29 2014 -0700

----------------------------------------------------------------------
 .../org/apache/aurora/scheduler/TaskVars.java   | 66 +++++++++++++++-----
 .../apache/aurora/scheduler/TaskVarsTest.java   | 62 ++++++++++++------
 2 files changed, 94 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/019ca28f/src/main/java/org/apache/aurora/scheduler/TaskVars.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TaskVars.java b/src/main/java/org/apache/aurora/scheduler/TaskVars.java
index 6654c16..f1ab934 100644
--- a/src/main/java/org/apache/aurora/scheduler/TaskVars.java
+++ b/src/main/java/org/apache/aurora/scheduler/TaskVars.java
@@ -29,11 +29,13 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.eventbus.Subscribe;
 import com.twitter.common.stats.StatsProvider;
 
 import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
@@ -52,8 +54,11 @@ import static java.util.Objects.requireNonNull;
  */
 class TaskVars implements EventSubscriber {
   private static final Logger LOG = Logger.getLogger(TaskVars.class.getName());
+  private static final ImmutableSet<ScheduleStatus> TRACKED_JOB_STATES =
+      ImmutableSet.of(ScheduleStatus.LOST, ScheduleStatus.FAILED);
 
   private final LoadingCache<String, Counter> counters;
+  private final LoadingCache<String, Counter> untrackedCounters;
   private final Storage storage;
   private volatile boolean exporting = false;
 
@@ -61,10 +66,15 @@ class TaskVars implements EventSubscriber {
   TaskVars(Storage storage, final StatsProvider statProvider) {
     this.storage = requireNonNull(storage);
     requireNonNull(statProvider);
-    counters = CacheBuilder.newBuilder().build(new CacheLoader<String, Counter>() {
+    counters = buildCache(statProvider);
+    untrackedCounters = buildCache(statProvider.untracked());
+  }
+
+  private LoadingCache<String, Counter> buildCache(final StatsProvider provider) {
+    return CacheBuilder.newBuilder().build(new CacheLoader<String, Counter>() {
       @Override
       public Counter load(String statName) {
-        Counter counter = new Counter(statProvider);
+        Counter counter = new Counter(provider);
         if (exporting) {
           counter.exportAs(statName);
         }
@@ -83,6 +93,14 @@ class TaskVars implements EventSubscriber {
     return "tasks_lost_rack_" + rack;
   }
 
+  @VisibleForTesting
+  static String jobStatName(IScheduledTask task, ScheduleStatus status) {
+    return String.format(
+        "tasks_%s_%s",
+        status,
+        JobKeys.canonicalString(JobKeys.from(task.getAssignedTask().getTask())));
+  }
+
   private static final Predicate<IAttribute> IS_RACK = new Predicate<IAttribute>()
{
     @Override
     public boolean apply(IAttribute attr) {
@@ -110,13 +128,10 @@ class TaskVars implements EventSubscriber {
     getCounter(status).decrement();
   }
 
-  @Subscribe
-  public void taskChangedState(TaskStateChange stateChange) {
-    IScheduledTask task = stateChange.getTask();
-    Optional<ScheduleStatus> previousState = stateChange.getOldState();
-    final String host = stateChange.getTask().getAssignedTask().getSlaveHost();
+  private void updateRackCounters(IScheduledTask task, ScheduleStatus newState) {
+    final String host = task.getAssignedTask().getSlaveHost();
     Optional<String> rack;
-    if (Strings.isNullOrEmpty(stateChange.getTask().getAssignedTask().getSlaveHost())) {
+    if (Strings.isNullOrEmpty(task.getAssignedTask().getSlaveHost())) {
       rack = Optional.absent();
     } else {
       rack = storage.consistentRead(new Work.Quiet<Optional<String>>() {
@@ -136,13 +151,7 @@ class TaskVars implements EventSubscriber {
       counters.getUnchecked(rackStatName(rack.get()));
     }
 
-    if (stateChange.isTransition() && !previousState.equals(Optional.of(ScheduleStatus.INIT)))
{
-      decrementCount(previousState.get());
-    }
-
-    incrementCount(task.getStatus());
-
-    if (stateChange.getNewState() == ScheduleStatus.LOST) {
+    if (newState == ScheduleStatus.LOST) {
       if (rack.isPresent()) {
         counters.getUnchecked(rackStatName(rack.get())).increment();
       } else {
@@ -151,6 +160,26 @@ class TaskVars implements EventSubscriber {
     }
   }
 
+  private void updateJobCounters(IScheduledTask task, ScheduleStatus newState) {
+    if (TRACKED_JOB_STATES.contains(newState)) {
+      untrackedCounters.getUnchecked(jobStatName(task, newState)).increment();
+    }
+  }
+
+  @Subscribe
+  public void taskChangedState(TaskStateChange stateChange) {
+    IScheduledTask task = stateChange.getTask();
+    Optional<ScheduleStatus> previousState = stateChange.getOldState();
+
+    if (stateChange.isTransition() && !previousState.equals(Optional.of(ScheduleStatus.INIT)))
{
+      decrementCount(previousState.get());
+    }
+    incrementCount(task.getStatus());
+
+    updateRackCounters(task, task.getStatus());
+    updateJobCounters(task, task.getStatus());
+  }
+
   @Subscribe
   public void schedulerActive(SchedulerActive event) {
     // Dummy read the counter for each status counter. This is important to guarantee a stat
with
@@ -160,10 +189,15 @@ class TaskVars implements EventSubscriber {
       getCounter(status);
     }
 
+    exportCounters(counters.asMap());
+    exportCounters(untrackedCounters.asMap());
+  }
+
+  private void exportCounters(Map<String, Counter> counterMap) {
     // Initiate export of all counters.  This is not done initially to avoid exporting values
that
     // do not represent the entire storage contents.
     exporting = true;
-    for (Map.Entry<String, Counter> entry : counters.asMap().entrySet()) {
+    for (Map.Entry<String, Counter> entry : counterMap.entrySet()) {
       entry.getValue().exportAs(entry.getKey());
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/019ca28f/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java b/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
index d02714c..371ae87 100644
--- a/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
@@ -50,6 +50,7 @@ import static org.apache.aurora.gen.ScheduleStatus.INIT;
 import static org.apache.aurora.gen.ScheduleStatus.LOST;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.scheduler.TaskVars.jobStatName;
 import static org.apache.aurora.scheduler.TaskVars.rackStatName;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
@@ -61,24 +62,35 @@ public class TaskVarsTest extends EasyMockTest {
   private static final String JOB_A = "job_a";
   private static final String JOB_B = "job_b";
   private static final String TASK_ID = "task_id";
+  private static final String ENV = "test";
 
   private StorageTestUtil storageUtil;
-  private StatsProvider trackedStats;
+  private StatsProvider trackedProvider;
+  private StatsProvider untrackedProvider;
   private TaskVars vars;
   private Map<String, Supplier<Long>> globalCounters;
 
   @Before
   public void setUp() {
     storageUtil = new StorageTestUtil(this);
-    trackedStats = createMock(StatsProvider.class);
-    vars = new TaskVars(storageUtil.storage, trackedStats);
-
+    trackedProvider = createMock(StatsProvider.class);
+    untrackedProvider = createMock(StatsProvider.class);
+    expect(trackedProvider.untracked()).andReturn(untrackedProvider);
     storageUtil.expectOperations();
     globalCounters = Maps.newHashMap();
   }
 
-  private void expectStatExport(final String name) {
-    expect(trackedStats.makeGauge(EasyMock.eq(name), EasyMock.<Supplier<Long>>anyObject()))
+  private void replayAndBuild() {
+    control.replay();
+    vars = new TaskVars(storageUtil.storage, trackedProvider);
+  }
+
+  private void expectStatExport(String name) {
+    expectStatExport(name, trackedProvider);
+  }
+
+  private void expectStatExport(final String name, StatsProvider provider) {
+    expect(provider.makeGauge(EasyMock.eq(name), EasyMock.<Supplier<Long>>anyObject()))
         .andAnswer(new IAnswer<Stat<Long>>() {
           @SuppressWarnings("unchecked")
           @Override
@@ -116,6 +128,7 @@ public class TaskVarsTest extends EasyMockTest {
             .setTaskId(TASK_ID)
             .setTask(new TaskConfig()
                 .setJobName(job)
+                .setEnvironment(ENV)
                 .setOwner(new Identity(ROLE_A, ROLE_A + "-user"))));
     if (Tasks.SLAVE_ASSIGNED_STATES.contains(status) || Tasks.isTerminated(status)) {
       task.getAssignedTask().setSlaveHost(host);
@@ -138,7 +151,7 @@ public class TaskVarsTest extends EasyMockTest {
   public void testStartsAtZero() {
     expectStatusCountersInitialized();
 
-    control.replay();
+    replayAndBuild();
     schedulerActivated();
 
     assertAllZero();
@@ -146,10 +159,9 @@ public class TaskVarsTest extends EasyMockTest {
 
   @Test
   public void testNoEarlyExport() {
-    control.replay();
+    replayAndBuild();
 
     // No variables should be exported since schedulerActive is never called.
-    vars = new TaskVars(storageUtil.storage, trackedStats);
     IScheduledTask taskA = makeTask(JOB_A, INIT);
     changeState(taskA, PENDING);
     changeState(IScheduledTask.build(taskA.newBuilder().setStatus(PENDING)), ASSIGNED);
@@ -171,7 +183,7 @@ public class TaskVarsTest extends EasyMockTest {
     expectGetHostRack("hostA", "rackA").atLeastOnce();
     expectStatExport(rackStatName("rackA"));
 
-    control.replay();
+    replayAndBuild();
     schedulerActivated();
 
     changeState(makeTask(JOB_A, INIT), PENDING);
@@ -200,13 +212,16 @@ public class TaskVarsTest extends EasyMockTest {
     expectStatExport(rackStatName("rackA"));
     expectStatExport(rackStatName("rackB"));
 
-    control.replay();
+    IScheduledTask failedTask = makeTask(JOB_B, FAILED, "hostB");
+    expectStatExport(jobStatName(failedTask, FAILED), untrackedProvider);
+
+    replayAndBuild();
     schedulerActivated(
         makeTask(JOB_A, PENDING),
         makeTask(JOB_A, RUNNING, "hostA"),
         makeTask(JOB_A, FINISHED, "hostA"),
         makeTask(JOB_B, PENDING),
-        makeTask(JOB_B, FAILED, "hostB"));
+        failedTask);
 
     assertEquals(2, getValue(PENDING));
     assertEquals(1, getValue(RUNNING));
@@ -214,6 +229,7 @@ public class TaskVarsTest extends EasyMockTest {
     assertEquals(1, getValue(FAILED));
     assertEquals(0, getValue(rackStatName("rackA")));
     assertEquals(0, getValue(rackStatName("rackB")));
+    assertEquals(1, getValue(jobStatName(failedTask, FAILED)));
   }
 
   private IExpectationSetters<?> expectGetHostRack(String host, String rackToReturn)
{
@@ -235,14 +251,18 @@ public class TaskVarsTest extends EasyMockTest {
     expectStatExport(rackStatName("rackA"));
     expectStatExport(rackStatName("rackB"));
 
-    control.replay();
-    schedulerActivated();
-
     IScheduledTask a = makeTask("jobA", RUNNING, "host1");
     IScheduledTask b = makeTask("jobB", RUNNING, "host2");
-    IScheduledTask c = makeTask("jobC", RUNNING, "host3");
+    IScheduledTask c = makeTask("jobD", RUNNING, "host3");
     IScheduledTask d = makeTask("jobD", RUNNING, "host1");
 
+    expectStatExport(jobStatName(a, LOST), untrackedProvider);
+    expectStatExport(jobStatName(b, LOST), untrackedProvider);
+    expectStatExport(jobStatName(c, LOST), untrackedProvider);
+
+    replayAndBuild();
+    schedulerActivated();
+
     changeState(a, LOST);
     changeState(b, LOST);
     changeState(c, LOST);
@@ -250,6 +270,10 @@ public class TaskVarsTest extends EasyMockTest {
 
     assertEquals(2, getValue(rackStatName("rackA")));
     assertEquals(2, getValue(rackStatName("rackB")));
+
+    assertEquals(1, getValue(jobStatName(a, LOST)));
+    assertEquals(1, getValue(jobStatName(b, LOST)));
+    assertEquals(2, getValue(jobStatName(c, LOST)));
   }
 
   @Test
@@ -258,10 +282,12 @@ public class TaskVarsTest extends EasyMockTest {
     expect(storageUtil.attributeStore.getHostAttributes("a"))
         .andReturn(Optional.<IHostAttributes>absent());
 
-    control.replay();
+    IScheduledTask a = makeTask(JOB_A, RUNNING, "a");
+    expectStatExport(jobStatName(a, LOST), untrackedProvider);
+
+    replayAndBuild();
     schedulerActivated();
 
-    IScheduledTask a = makeTask(JOB_A, RUNNING, "a");
     changeState(a, LOST);
     // Since no attributes are stored for the host, a variable is not exported/updated.
   }


Mime
View raw message