Return-Path: X-Original-To: apmail-aurora-commits-archive@minotaur.apache.org Delivered-To: apmail-aurora-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3C8761051A for ; Wed, 22 Jan 2014 17:55:43 +0000 (UTC) Received: (qmail 9560 invoked by uid 500); 22 Jan 2014 17:55:42 -0000 Delivered-To: apmail-aurora-commits-archive@aurora.apache.org Received: (qmail 9534 invoked by uid 500); 22 Jan 2014 17:55:42 -0000 Mailing-List: contact commits-help@aurora.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aurora.incubator.apache.org Delivered-To: mailing list commits@aurora.incubator.apache.org Received: (qmail 9523 invoked by uid 99); 22 Jan 2014 17:55:42 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Jan 2014 17:55:42 +0000 X-ASF-Spam-Status: No, hits=-2000.5 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 22 Jan 2014 17:55:38 +0000 Received: (qmail 8519 invoked by uid 99); 22 Jan 2014 17:55:16 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Jan 2014 17:55:16 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 075DD8A012A; Wed, 22 Jan 2014 17:55:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wfarner@apache.org To: commits@aurora.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: Only export counters in TaskVars after SchedulerActive event. Date: Wed, 22 Jan 2014 17:55:15 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Updated Branches: refs/heads/master d7a82dc42 -> 479791381 Only export counters in TaskVars after SchedulerActive event. Bugs closed: AURORA-59 Reviewed at https://reviews.apache.org/r/17095/ Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/47979138 Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/47979138 Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/47979138 Branch: refs/heads/master Commit: 47979138100b6049acd7d6b2e4aba8858bd44040 Parents: d7a82dc Author: Bill Farner Authored: Wed Jan 22 09:48:57 2014 -0800 Committer: Bill Farner Committed: Wed Jan 22 09:48:57 2014 -0800 ---------------------------------------------------------------------- .../org/apache/aurora/scheduler/TaskVars.java | 72 ++++++++++++++----- .../apache/aurora/scheduler/TaskVarsTest.java | 73 +++++++++++++------- 2 files changed, 101 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/47979138/src/main/java/org/apache/aurora/scheduler/TaskVars.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/TaskVars.java b/src/main/java/org/apache/aurora/scheduler/TaskVars.java index d5d752e..38e112a 100644 --- a/src/main/java/org/apache/aurora/scheduler/TaskVars.java +++ b/src/main/java/org/apache/aurora/scheduler/TaskVars.java @@ -15,6 +15,7 @@ */ package org.apache.aurora.scheduler; +import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; @@ -24,6 +25,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Predicate; +import com.google.common.base.Supplier; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -52,23 +54,21 @@ import static com.google.common.base.Preconditions.checkNotNull; class TaskVars implements EventSubscriber { private static final Logger LOG = Logger.getLogger(TaskVars.class.getName()); - private final LoadingCache countersByStatus; - private final LoadingCache countersByRack; - + private final LoadingCache counters; private final Storage storage; + private volatile boolean exporting = false; @Inject TaskVars(Storage storage, final StatsProvider statProvider) { this.storage = checkNotNull(storage); checkNotNull(statProvider); - countersByStatus = CacheBuilder.newBuilder().build(new CacheLoader() { - @Override public AtomicLong load(String statName) { - return statProvider.makeCounter(statName); - } - }); - countersByRack = CacheBuilder.newBuilder().build(new CacheLoader() { - @Override public AtomicLong load(String rack) { - return statProvider.makeCounter(rackStatName(rack)); + counters = CacheBuilder.newBuilder().build(new CacheLoader() { + @Override public Counter load(String statName) { + Counter counter = new Counter(statProvider); + if (exporting) { + counter.exportAs(statName); + } + return counter; } }); } @@ -95,16 +95,16 @@ class TaskVars implements EventSubscriber { } }; - private AtomicLong getCounter(ScheduleStatus status) { - return countersByStatus.getUnchecked(getVarName(status)); + private Counter getCounter(ScheduleStatus status) { + return counters.getUnchecked(getVarName(status)); } private void incrementCount(ScheduleStatus status) { - getCounter(status).incrementAndGet(); + getCounter(status).increment(); } private void decrementCount(ScheduleStatus status) { - getCounter(status).decrementAndGet(); + getCounter(status).decrement(); } @Subscribe @@ -129,7 +129,7 @@ class TaskVars implements EventSubscriber { }); if (rack.isPresent()) { - countersByRack.getUnchecked(rack.get()).incrementAndGet(); + counters.getUnchecked(rackStatName(rack.get())).increment(); } else { LOG.warning("Failed to find rack attribute associated with host " + host); } @@ -138,15 +138,19 @@ class TaskVars implements EventSubscriber { @Subscribe public void schedulerActive(SchedulerActive event) { - // TODO(wfarner): This should probably induce the initial 'export' of stats, so that incomplete - // values are not surfaced while storage is recovering. - // Dummy read the counter for each status counter. This is important to guarantee a stat with // value zero is present for each state, even if all states are not represented in the task // store. for (ScheduleStatus status : ScheduleStatus.values()) { getCounter(status); } + + // Initiate export of all counters. This is not done initially to avoid exporting values that + // do not represent the entire storage contents. + exporting = true; + for (Map.Entry entry : counters.asMap().entrySet()) { + entry.getValue().exportAs(entry.getKey()); + } } @Subscribe @@ -155,4 +159,34 @@ class TaskVars implements EventSubscriber { decrementCount(task.getStatus()); } } + + private static class Counter implements Supplier { + private final AtomicLong value = new AtomicLong(); + private boolean exported = false; + private final StatsProvider stats; + + Counter(StatsProvider stats) { + this.stats = stats; + } + + @Override + public Long get() { + return value.get(); + } + + private synchronized void exportAs(String name) { + if (!exported) { + stats.makeGauge(name, this); + exported = true; + } + } + + private void increment() { + value.incrementAndGet(); + } + + private void decrement() { + value.decrementAndGet(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/47979138/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java b/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java index dde053c..d6fc72e 100644 --- a/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java +++ b/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java @@ -16,11 +16,12 @@ package org.apache.aurora.scheduler; import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; import com.google.common.base.Optional; +import com.google.common.base.Supplier; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; +import com.twitter.common.stats.Stat; import com.twitter.common.stats.StatsProvider; import com.twitter.common.testing.easymock.EasyMockTest; @@ -36,6 +37,8 @@ import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.easymock.EasyMock; +import org.easymock.IAnswer; import org.easymock.IExpectationSetters; import org.junit.Before; import org.junit.Test; @@ -49,6 +52,7 @@ import static org.apache.aurora.gen.ScheduleStatus.PENDING; import static org.apache.aurora.gen.ScheduleStatus.RUNNING; import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; public class TaskVarsTest extends EasyMockTest { @@ -60,7 +64,7 @@ public class TaskVarsTest extends EasyMockTest { private StorageTestUtil storageUtil; private StatsProvider trackedStats; private TaskVars vars; - private Map globalCounters; + private Map> globalCounters; @Before public void setUp() { @@ -72,11 +76,21 @@ public class TaskVarsTest extends EasyMockTest { globalCounters = Maps.newHashMap(); } + private void expectStatExport(final String name) { + expect(trackedStats.makeGauge(EasyMock.eq(name), EasyMock.>anyObject())) + .andAnswer(new IAnswer>() { + @SuppressWarnings("unchecked") + @Override public Stat answer() { + assertFalse(globalCounters.containsKey(name)); + globalCounters.put(name, (Supplier) EasyMock.getCurrentArguments()[1]); + return null; + } + }); + } + private void expectStatusCountersInitialized() { for (ScheduleStatus status : ScheduleStatus.values()) { - AtomicLong counter = new AtomicLong(0); - globalCounters.put(status, counter); - expect(trackedStats.makeCounter(TaskVars.getVarName(status))).andReturn(counter); + expectStatExport(TaskVars.getVarName(status)); } } @@ -109,8 +123,8 @@ public class TaskVarsTest extends EasyMockTest { } private void assertAllZero() { - for (AtomicLong counter : globalCounters.values()) { - assertEquals(0L, counter.get()); + for (Supplier counter : globalCounters.values()) { + assertEquals(0L, counter.get().longValue()); } } @@ -128,8 +142,19 @@ public class TaskVarsTest extends EasyMockTest { public void testNoEarlyExport() { control.replay(); - // No variables should be exported prior to storage starting. + // No variables should be exported since schedulerActive is never called. vars = new TaskVars(storageUtil.storage, trackedStats); + IScheduledTask taskA = makeTask(JOB_A, INIT); + changeState(taskA, PENDING); + changeState(IScheduledTask.build(taskA.newBuilder().setStatus(PENDING)), ASSIGNED); + } + + private int getValue(String name) { + return globalCounters.get(name).get().intValue(); + } + + private int getValue(ScheduleStatus status) { + return getValue(TaskVars.getVarName(status)); } @Test @@ -141,16 +166,16 @@ public class TaskVarsTest extends EasyMockTest { IScheduledTask taskA = makeTask(JOB_A, INIT); changeState(taskA, PENDING); - assertEquals(1, globalCounters.get(PENDING).get()); + assertEquals(1, getValue(PENDING)); changeState(IScheduledTask.build(taskA.newBuilder().setStatus(PENDING)), ASSIGNED); - assertEquals(0, globalCounters.get(PENDING).get()); - assertEquals(1, globalCounters.get(ASSIGNED).get()); + assertEquals(0, getValue(PENDING)); + assertEquals(1, getValue(ASSIGNED)); changeState(IScheduledTask.build(taskA.newBuilder().setStatus(ASSIGNED)), RUNNING); - assertEquals(0, globalCounters.get(ASSIGNED).get()); - assertEquals(1, globalCounters.get(RUNNING).get()); + assertEquals(0, getValue(ASSIGNED)); + assertEquals(1, getValue(RUNNING)); changeState(IScheduledTask.build(taskA.newBuilder().setStatus(RUNNING)), FINISHED); - assertEquals(0, globalCounters.get(RUNNING).get()); - assertEquals(1, globalCounters.get(FINISHED).get()); + assertEquals(0, getValue(RUNNING)); + assertEquals(1, getValue(FINISHED)); vars.tasksDeleted(new TasksDeleted(ImmutableSet.of( IScheduledTask.build(taskA.newBuilder().setStatus(FINISHED))))); assertAllZero(); @@ -168,10 +193,10 @@ public class TaskVarsTest extends EasyMockTest { makeTask(JOB_B, PENDING), makeTask(JOB_B, FAILED)); - assertEquals(2, globalCounters.get(PENDING).get()); - assertEquals(1, globalCounters.get(RUNNING).get()); - assertEquals(1, globalCounters.get(FINISHED).get()); - assertEquals(1, globalCounters.get(FAILED).get()); + assertEquals(2, getValue(PENDING)); + assertEquals(1, getValue(RUNNING)); + assertEquals(1, getValue(FINISHED)); + assertEquals(1, getValue(FAILED)); } private IExpectationSetters expectGetHostRack(String host, String rackToReturn) { @@ -190,10 +215,8 @@ public class TaskVarsTest extends EasyMockTest { expectGetHostRack("host2", "rackB").atLeastOnce(); expectGetHostRack("host3", "rackB").atLeastOnce(); - AtomicLong rackA = new AtomicLong(); - expect(trackedStats.makeCounter(TaskVars.rackStatName("rackA"))).andReturn(rackA); - AtomicLong rackB = new AtomicLong(); - expect(trackedStats.makeCounter(TaskVars.rackStatName("rackB"))).andReturn(rackB); + expectStatExport(TaskVars.rackStatName("rackA")); + expectStatExport(TaskVars.rackStatName("rackB")); control.replay(); schedulerActivated(); @@ -208,8 +231,8 @@ public class TaskVarsTest extends EasyMockTest { changeState(c, LOST); changeState(d, LOST); - assertEquals(2, rackA.get()); - assertEquals(2, rackB.get()); + assertEquals(2, getValue(TaskVars.rackStatName("rackA"))); + assertEquals(2, getValue(TaskVars.rackStatName("rackB"))); } @Test