Return-Path: X-Original-To: apmail-aurora-commits-archive@minotaur.apache.org Delivered-To: apmail-aurora-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1956818A69 for ; Fri, 22 Jan 2016 01:38:36 +0000 (UTC) Received: (qmail 6484 invoked by uid 500); 22 Jan 2016 01:38:36 -0000 Delivered-To: apmail-aurora-commits-archive@aurora.apache.org Received: (qmail 6444 invoked by uid 500); 22 Jan 2016 01:38:36 -0000 Mailing-List: contact commits-help@aurora.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aurora.apache.org Delivered-To: mailing list commits@aurora.apache.org Received: (qmail 6435 invoked by uid 99); 22 Jan 2016 01:38:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Jan 2016 01:38:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DA83DDFF94; Fri, 22 Jan 2016 01:38:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zmanji@apache.org To: commits@aurora.apache.org Message-Id: <131808b7bcd244b68c039ff084a182c2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: aurora git commit: Turn TaskHistoryPruner into a service and trigger shutdown on pruning failure. Date: Fri, 22 Jan 2016 01:38:35 +0000 (UTC) Repository: aurora Updated Branches: refs/heads/master a2c7ccc17 -> c89fecbcd Turn TaskHistoryPruner into a service and trigger shutdown on pruning failure. Task pruning is key to operating a large cluster and failure to prune should trigger shutdown to prevent unbounded growth of storage. This patch turns `TaskHistoryPruner` into a service which propagates failure from failed pruning attempts towards the `ServiceManager`. Also completing a TODO which removes a test for behaviour that is very awkward to test for. Bugs closed: AURORA-1582 Reviewed at https://reviews.apache.org/r/42332/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/c89fecbc Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/c89fecbc Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/c89fecbc Branch: refs/heads/master Commit: c89fecbcd93aa6ddcf6af60f2a2cb6315b7a4d19 Parents: a2c7ccc Author: Zameer Manji Authored: Thu Jan 21 17:38:25 2016 -0800 Committer: Zameer Manji Committed: Thu Jan 21 17:38:25 2016 -0800 ---------------------------------------------------------------------- .../aurora/scheduler/pruning/PruningModule.java | 1 + .../scheduler/pruning/TaskHistoryPruner.java | 52 ++++++-- .../pruning/TaskHistoryPrunerTest.java | 126 ++++--------------- 3 files changed, 68 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/c89fecbc/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java b/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java index 735199a..efdfbda 100644 --- a/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java +++ b/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java @@ -83,6 +83,7 @@ public class PruningModule extends AbstractModule { expose(TaskHistoryPruner.class); } }); + SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(TaskHistoryPruner.class); PubsubEventModule.bindSubscriber(binder(), TaskHistoryPruner.class); install(new PrivateModule() { http://git-wip-us.apache.org/repos/asf/aurora/blob/c89fecbc/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java index 2064089..2d4c58e 100644 --- a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java +++ b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java @@ -14,6 +14,9 @@ package org.apache.aurora.scheduler.pruning; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; import javax.inject.Inject; @@ -22,7 +25,9 @@ import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import com.google.common.collect.Queues; import com.google.common.eventbus.Subscribe; +import com.google.common.util.concurrent.AbstractScheduledService; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; @@ -42,6 +47,8 @@ import org.slf4j.LoggerFactory; import static java.util.Objects.requireNonNull; +import static com.google.common.base.Preconditions.checkState; + import static org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; @@ -49,7 +56,7 @@ import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; * Prunes tasks in a job based on per-job history and an inactive time threshold by observing tasks * transitioning into one of the inactive states. */ -public class TaskHistoryPruner implements EventSubscriber { +public class TaskHistoryPruner extends AbstractScheduledService implements EventSubscriber { private static final Logger LOG = LoggerFactory.getLogger(TaskHistoryPruner.class); private final DelayExecutor executor; @@ -57,6 +64,7 @@ public class TaskHistoryPruner implements EventSubscriber { private final Clock clock; private final HistoryPrunnerSettings settings; private final Storage storage; + private final ConcurrentLinkedQueue> futureTasks; private final Predicate safeToDelete = new Predicate() { @Override @@ -95,6 +103,7 @@ public class TaskHistoryPruner implements EventSubscriber { this.clock = requireNonNull(clock); this.settings = requireNonNull(settings); this.storage = requireNonNull(storage); + this.futureTasks = Queues.newConcurrentLinkedQueue(); } @VisibleForTesting @@ -111,6 +120,8 @@ public class TaskHistoryPruner implements EventSubscriber { */ @Subscribe public void recordStateChange(TaskStateChange change) { + checkState(isRunning()); + if (Tasks.isTerminated(change.getNewState())) { long timeoutBasis = change.isTransition() ? clock.nowMillis() @@ -122,6 +133,22 @@ public class TaskHistoryPruner implements EventSubscriber { } } + @Override + protected void runOneIteration() throws Exception { + // Check if the prune attempts fail and propagate the exception. This will trigger + // service (and the scheduler) to shut down. + FutureTask future; + + while ((future = futureTasks.poll()) != null) { + future.get(); + } + } + + @Override + protected Scheduler scheduler() { + return AbstractScheduledService.Scheduler.newFixedDelaySchedule(0, 5, TimeUnit.SECONDS); + } + private void deleteTasks(final Set taskIds) { LOG.info("Pruning inactive tasks " + taskIds); storage.write( @@ -139,14 +166,16 @@ public class TaskHistoryPruner implements EventSubscriber { long timeRemaining) { LOG.debug("Prune task " + taskId + " in " + timeRemaining + " ms."); - executor.execute( - () -> { - LOG.info("Pruning expired inactive task " + taskId); - deleteTasks(ImmutableSet.of(taskId)); - }, - Amount.of(timeRemaining, Time.MILLISECONDS)); - - executor.execute(() -> { + + FutureTask pruneSingleTask = new FutureTask<>(() -> { + LOG.info("Pruning expired inactive task " + taskId); + deleteTasks(ImmutableSet.of(taskId)); + }, null); + futureTasks.add(pruneSingleTask); + + executor.execute(pruneSingleTask, Amount.of(timeRemaining, Time.MILLISECONDS)); + + FutureTask pruneRemainingTasksFromJob = new FutureTask<>(() -> { Iterable inactiveTasks = Storage.Util.fetchTasks(storage, jobHistoryQuery(jobKey)); int numInactiveTasks = Iterables.size(inactiveTasks); @@ -162,6 +191,9 @@ public class TaskHistoryPruner implements EventSubscriber { deleteTasks(toPrune); } } - }); + }, null); + futureTasks.add(pruneRemainingTasksFromJob); + + executor.execute(pruneRemainingTasksFromJob); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/c89fecbc/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java index 295960f..e1b5391 100644 --- a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java +++ b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java @@ -13,26 +13,15 @@ */ package org.apache.aurora.scheduler.pruning; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.io.Closer; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.inject.AbstractModule; -import com.google.inject.Guice; -import com.google.inject.Injector; -import com.google.inject.Key; - -import org.apache.aurora.common.base.Command; +import com.google.common.util.concurrent.Service; + import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.stats.StatsProvider; import org.apache.aurora.common.testing.easymock.EasyMockTest; import org.apache.aurora.common.util.testing.FakeClock; import org.apache.aurora.gen.AssignedTask; @@ -43,8 +32,6 @@ import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.gen.TaskEvent; -import org.apache.aurora.scheduler.async.AsyncModule; -import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; import org.apache.aurora.scheduler.async.DelayExecutor; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; @@ -53,14 +40,12 @@ import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; -import org.apache.aurora.scheduler.testing.FakeStatsProvider; import org.easymock.Capture; import org.easymock.EasyMock; import org.junit.After; import org.junit.Before; import org.junit.Test; -import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED; import static org.apache.aurora.gen.ScheduleStatus.FINISHED; import static org.apache.aurora.gen.ScheduleStatus.KILLED; import static org.apache.aurora.gen.ScheduleStatus.LOST; @@ -68,11 +53,12 @@ import static org.apache.aurora.gen.ScheduleStatus.RUNNING; import static org.apache.aurora.gen.ScheduleStatus.STARTING; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expectLastCall; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; public class TaskHistoryPrunerTest extends EasyMockTest { private static final String JOB_A = "job-a"; - private static final String TASK_ID = "task_id"; private static final String SLAVE_HOST = "HOST_A"; private static final Amount ONE_MS = Amount.of(1L, Time.MILLISECONDS); private static final Amount ONE_MINUTE = Amount.of(1L, Time.MINUTES); @@ -101,6 +87,8 @@ public class TaskHistoryPrunerTest extends EasyMockTest { new HistoryPrunnerSettings(ONE_DAY, ONE_MINUTE, PER_JOB_HISTORY), storageUtil.storage); closer = Closer.create(); + + pruner.startAsync().awaitRunning(); } @After @@ -249,89 +237,30 @@ public class TaskHistoryPrunerTest extends EasyMockTest { changeState(d, dLost); } - // TODO(William Farner): Consider removing the thread safety tests. Now that intrinsic locks - // are not used, it is rather awkward to test this. @Test - public void testThreadSafeStateChangeEvent() throws Exception { - // This tests against regression where an executor pruning a task holds an intrinsic lock and - // an unrelated task state change in the scheduler fires an event that requires this intrinsic - // lock. This causes a deadlock when the executor tries to acquire a lock held by the event - // fired. - - ScheduledThreadPoolExecutor realExecutor = new ScheduledThreadPoolExecutor(1, - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("testThreadSafeEvents-executor") - .build()); - closer.register( - () -> MoreExecutors.shutdownAndAwaitTermination(realExecutor, 1L, TimeUnit.SECONDS)); - - Injector injector = Guice.createInjector( - new AsyncModule(realExecutor), - new AbstractModule() { - @Override - protected void configure() { - bind(StatsProvider.class).toInstance(new FakeStatsProvider()); - } - }); - executor = injector.getInstance(Key.get(DelayExecutor.class, AsyncExecutor.class)); - - pruner = buildPruner(executor); - // The goal is to verify that the call does not deadlock. We do not care about the outcome. - Command onDeleted = () -> changeState(makeTask("b", ASSIGNED), STARTING); - CountDownLatch taskDeleted = expectTaskDeleted(onDeleted, TASK_ID); + public void serviceShutdownOnFailure() { + IScheduledTask running = makeTask("a", RUNNING); + IScheduledTask killed = copy(running, KILLED); + expectNoImmediatePrune(ImmutableSet.of(running)); + Capture delayedDelete = expectDefaultDelayedPrune(); - control.replay(); + expectDeleteTasks("a"); + expectLastCall().andThrow(new RuntimeException("oops")); - // Change the task to a terminal state and wait for it to be pruned. - changeState(makeTask(TASK_ID, RUNNING), KILLED); - taskDeleted.await(); - } + control.replay(); - private TaskHistoryPruner buildPruner(DelayExecutor delayExecutor) { - return new TaskHistoryPruner( - delayExecutor, - stateManager, - clock, - new HistoryPrunnerSettings(Amount.of(1L, Time.MILLISECONDS), ONE_MS, PER_JOB_HISTORY), - storageUtil.storage); - } + changeState(running, killed); + clock.advance(ONE_HOUR); + delayedDelete.getValue().run(); + // awaitTerminated throws an IllegalStateException if the service fails + try { + pruner.awaitTerminated(); + fail(); + } catch (IllegalStateException e) { + assertEquals(Service.State.FAILED, pruner.state()); + } - private CountDownLatch expectTaskDeleted(Command onDelete, String taskId) { - CountDownLatch deleteCalled = new CountDownLatch(1); - CountDownLatch eventDelivered = new CountDownLatch(1); - - Thread eventDispatch = new Thread() { - @Override - public void run() { - try { - deleteCalled.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - fail("Interrupted while awaiting for delete call."); - return; - } - onDelete.execute(); - eventDelivered.countDown(); - } - }; - eventDispatch.setDaemon(true); - eventDispatch.setName(getClass().getName() + "-EventDispatch"); - eventDispatch.start(); - - stateManager.deleteTasks(storageUtil.mutableStoreProvider, ImmutableSet.of(taskId)); - expectLastCall().andAnswer(() -> { - deleteCalled.countDown(); - try { - eventDelivered.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - fail("Interrupted while awaiting for event delivery."); - } - return null; - }); - - return eventDelivered; + assertNotNull(pruner.failureCause()); } private void expectDeleteTasks(String... tasks) { @@ -384,11 +313,6 @@ public class TaskHistoryPrunerTest extends EasyMockTest { pruner.recordStateChange(TaskStateChange.transition(newStateTask, oldStateTask.getStatus())); } - private void changeState(IScheduledTask oldStateTask, ScheduleStatus status) { - pruner.recordStateChange( - TaskStateChange.transition(copy(oldStateTask, status), oldStateTask.getStatus())); - } - private IScheduledTask copy(IScheduledTask task, ScheduleStatus status) { return IScheduledTask.build(task.newBuilder().setStatus(status)); }