Return-Path: X-Original-To: apmail-aurora-commits-archive@minotaur.apache.org Delivered-To: apmail-aurora-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1D1C2173CE for ; Wed, 22 Jul 2015 19:40:02 +0000 (UTC) Received: (qmail 76030 invoked by uid 500); 22 Jul 2015 19:39:59 -0000 Delivered-To: apmail-aurora-commits-archive@aurora.apache.org Received: (qmail 75937 invoked by uid 500); 22 Jul 2015 19:39:59 -0000 Mailing-List: contact commits-help@aurora.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aurora.apache.org Delivered-To: mailing list commits@aurora.apache.org Received: (qmail 75846 invoked by uid 99); 22 Jul 2015 19:39:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Jul 2015 19:39:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BF461E01FB; Wed, 22 Jul 2015 19:39:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wfarner@apache.org To: commits@aurora.apache.org Date: Wed, 22 Jul 2015 19:40:01 -0000 Message-Id: In-Reply-To: <76b9bd048c524f7194b93975aab8959e@git.apache.org> References: <76b9bd048c524f7194b93975aab8959e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/8] aurora git commit: Break apart async package and AsyncModule into purpose-specific equivalents. http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelayTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelayTest.java b/src/test/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelayTest.java deleted file mode 100644 index 1aed40c..0000000 --- a/src/test/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelayTest.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -import com.twitter.common.testing.easymock.EasyMockTest; -import com.twitter.common.util.Random; - -import org.junit.Test; - -import static org.easymock.EasyMock.expect; -import static org.junit.Assert.assertEquals; - -public class RandomJitterReturnDelayTest extends EasyMockTest { - private void assertRandomJitterReturnDelay( - int minHoldTimeMs, - int jitterWindowMs, - boolean shouldThrow) { - - int randomValue = 123; - - Random mockRandom = control.createMock(Random.class); - - if (!shouldThrow) { - expect(mockRandom.nextInt(jitterWindowMs)).andReturn(randomValue); - } - - control.replay(); - - assertEquals( - minHoldTimeMs + randomValue, - new RandomJitterReturnDelay( - minHoldTimeMs, - jitterWindowMs, - mockRandom).get().getValue().intValue()); - } - - @Test - public void testRandomJitterReturnDelay() throws Exception { - assertRandomJitterReturnDelay(100, 200, false); - } - - @Test(expected = IllegalArgumentException.class) - public void testNegativeHoldTimeThrowsIllegalArgumentException() throws Exception { - assertRandomJitterReturnDelay(-1, 200, true); - } - - @Test(expected = IllegalArgumentException.class) - public void testNegativeWindowThrowsIllegalArgumentException() throws Exception { - assertRandomJitterReturnDelay(100, -1, true); - } - - @Test - public void testZeroHoldTime() throws Exception { - assertRandomJitterReturnDelay(0, 200, false); - } - - @Test - public void testZeroWindow() throws Exception { - assertRandomJitterReturnDelay(100, 0, false); - } - - @Test - public void testZeroHoldTimeZeroWindow() throws Exception { - assertRandomJitterReturnDelay(0, 0, false); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/RescheduleCalculatorImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/RescheduleCalculatorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/RescheduleCalculatorImplTest.java deleted file mode 100644 index 131bd82..0000000 --- a/src/test/java/org/apache/aurora/scheduler/async/RescheduleCalculatorImplTest.java +++ /dev/null @@ -1,188 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -import java.util.Map; -import java.util.Map.Entry; - -import com.google.common.base.Function; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableMap; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.testing.easymock.EasyMockTest; -import com.twitter.common.util.BackoffStrategy; - -import org.apache.aurora.gen.AssignedTask; -import org.apache.aurora.gen.Identity; -import org.apache.aurora.gen.ScheduleStatus; -import org.apache.aurora.gen.ScheduledTask; -import org.apache.aurora.gen.TaskConfig; -import org.apache.aurora.gen.TaskEvent; -import org.apache.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl; -import org.apache.aurora.scheduler.base.Query; -import org.apache.aurora.scheduler.base.Tasks; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; -import org.junit.Before; -import org.junit.Test; - -import static org.apache.aurora.gen.ScheduleStatus.FINISHED; -import static org.apache.aurora.gen.ScheduleStatus.INIT; -import static org.apache.aurora.gen.ScheduleStatus.KILLED; -import static org.apache.aurora.gen.ScheduleStatus.KILLING; -import static org.apache.aurora.gen.ScheduleStatus.PENDING; -import static org.apache.aurora.gen.ScheduleStatus.RUNNING; -import static org.easymock.EasyMock.expect; -import static org.junit.Assert.assertEquals; - -public class RescheduleCalculatorImplTest extends EasyMockTest { - - private static final Amount FLAPPING_THRESHOLD = Amount.of(1L, Time.MINUTES); - private static final Amount MAX_STARTUP_DELAY = Amount.of(10, Time.MINUTES); - - private StorageTestUtil storageUtil; - private BackoffStrategy backoff; - private RescheduleCalculator rescheduleCalculator; - - @Before - public void setUp() { - storageUtil = new StorageTestUtil(this); - backoff = createMock(BackoffStrategy.class); - rescheduleCalculator = new RescheduleCalculatorImpl( - storageUtil.storage, - new RescheduleCalculatorImpl.RescheduleCalculatorSettings( - backoff, - FLAPPING_THRESHOLD, - MAX_STARTUP_DELAY)); - storageUtil.expectOperations(); - } - - @Test - public void testNoPenaltyForNoAncestor() { - control.replay(); - - assertEquals(0L, rescheduleCalculator.getFlappingPenaltyMs(makeTask("a", INIT))); - } - - @Test - public void testNoPenaltyDeletedAncestor() { - String ancestorId = "a"; - storageUtil.expectTaskFetch(Query.taskScoped(ancestorId)); - - control.replay(); - - assertEquals( - 0L, - rescheduleCalculator.getFlappingPenaltyMs(setAncestor(makeTask("b", INIT), ancestorId))); - } - - @Test - public void testFlappingTask() { - IScheduledTask ancestor = makeFlappyTask("a"); - storageUtil.expectTaskFetch(Query.taskScoped(Tasks.id(ancestor)), ancestor); - long penaltyMs = 1000L; - expect(backoff.calculateBackoffMs(0L)).andReturn(penaltyMs); - - control.replay(); - - assertEquals( - penaltyMs, - rescheduleCalculator.getFlappingPenaltyMs( - setAncestor(makeTask("b", INIT), Tasks.id(ancestor)))); - } - - @Test - public void testFlappingTasksBackoffTruncation() { - // Ensures that the reschedule calculator detects penalty truncation and avoids inspecting - // ancestors once truncated. - IScheduledTask taskA = setAncestor(makeFlappyTask("a"), "bugIfQueried"); - IScheduledTask taskB = setAncestor(makeFlappyTask("b"), Tasks.id(taskA)); - IScheduledTask taskC = setAncestor(makeFlappyTask("c"), Tasks.id(taskB)); - IScheduledTask taskD = setAncestor(makeFlappyTask("d"), Tasks.id(taskC)); - - Map ancestorsAndPenalties = ImmutableMap.of( - taskD, 100L, - taskC, 200L, - taskB, 300L, - taskA, 300L); - - long lastPenalty = 0L; - for (Map.Entry taskAndPenalty : ancestorsAndPenalties.entrySet()) { - storageUtil.expectTaskFetch( - Query.taskScoped(Tasks.id(taskAndPenalty.getKey())), - taskAndPenalty.getKey()); - expect(backoff.calculateBackoffMs(lastPenalty)).andReturn(taskAndPenalty.getValue()); - lastPenalty = taskAndPenalty.getValue(); - } - - control.replay(); - - IScheduledTask newTask = setAncestor(makeFlappyTask("newTask"), Tasks.id(taskD)); - assertEquals(300L, rescheduleCalculator.getFlappingPenaltyMs(newTask)); - } - - @Test - public void testNoPenaltyForInterruptedTasks() { - IScheduledTask ancestor = setEvents( - makeTask("a", KILLED), - ImmutableMap.of(INIT, 0L, PENDING, 100L, RUNNING, 200L, KILLING, 300L, KILLED, 400L)); - storageUtil.expectTaskFetch(Query.taskScoped(Tasks.id(ancestor)), ancestor); - - control.replay(); - - assertEquals( - 0L, - rescheduleCalculator.getFlappingPenaltyMs( - setAncestor(makeTask("b", INIT), Tasks.id(ancestor)))); - } - - private IScheduledTask makeFlappyTask(String taskId) { - return setEvents( - makeTask(taskId, FINISHED), - ImmutableMap.of(INIT, 0L, PENDING, 100L, RUNNING, 200L, FINISHED, 300L)); - } - - private IScheduledTask makeTask(String taskId) { - return IScheduledTask.build(new ScheduledTask() - .setAssignedTask(new AssignedTask() - .setInstanceId(0) - .setTaskId(taskId) - .setTask(new TaskConfig() - .setJobName("job-" + taskId) - .setOwner(new Identity().setRole("role-" + taskId).setUser("user-" + taskId)) - .setEnvironment("env-" + taskId)))); - } - - private IScheduledTask makeTask(String taskId, ScheduleStatus status) { - return IScheduledTask.build(makeTask(taskId).newBuilder().setStatus(status)); - } - - private IScheduledTask setAncestor(IScheduledTask task, String ancestorId) { - return IScheduledTask.build(task.newBuilder().setAncestorId(ancestorId)); - } - - private static final Function, TaskEvent> TO_EVENT = - new Function, TaskEvent>() { - @Override - public TaskEvent apply(Entry input) { - return new TaskEvent().setStatus(input.getKey()).setTimestamp(input.getValue()); - } - }; - - private IScheduledTask setEvents(IScheduledTask task, Map events) { - return IScheduledTask.build(task.newBuilder().setTaskEvents( - FluentIterable.from(events.entrySet()).transform(TO_EVENT).toList())); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/TaskGroupsTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskGroupsTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskGroupsTest.java deleted file mode 100644 index 51256f4..0000000 --- a/src/test/java/org/apache/aurora/scheduler/async/TaskGroupsTest.java +++ /dev/null @@ -1,140 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; - -import com.google.common.collect.ImmutableSet; -import com.google.common.util.concurrent.RateLimiter; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.testing.easymock.EasyMockTest; -import com.twitter.common.util.BackoffStrategy; - -import org.apache.aurora.gen.AssignedTask; -import org.apache.aurora.gen.Identity; -import org.apache.aurora.gen.ScheduleStatus; -import org.apache.aurora.gen.ScheduledTask; -import org.apache.aurora.gen.TaskConfig; -import org.apache.aurora.scheduler.base.Tasks; -import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; -import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.easymock.Capture; -import org.easymock.EasyMock; -import org.easymock.IAnswer; -import org.junit.Before; -import org.junit.Test; - -import static java.util.concurrent.TimeUnit.MILLISECONDS; - -import static org.apache.aurora.gen.ScheduleStatus.INIT; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; - -public class TaskGroupsTest extends EasyMockTest { - - private static final long FIRST_SCHEDULE_DELAY_MS = 1L; - - private ScheduledExecutorService executor; - private BackoffStrategy backoffStrategy; - private TaskScheduler taskScheduler; - private RateLimiter rateLimiter; - - private TaskGroups taskGroups; - - @Before - public void setUp() throws Exception { - executor = createMock(ScheduledExecutorService.class); - backoffStrategy = createMock(BackoffStrategy.class); - taskScheduler = createMock(TaskScheduler.class); - rateLimiter = createMock(RateLimiter.class); - taskGroups = new TaskGroups( - executor, - Amount.of(FIRST_SCHEDULE_DELAY_MS, Time.MILLISECONDS), - backoffStrategy, - rateLimiter, - taskScheduler, - createMock(RescheduleCalculator.class)); - } - - @Test - public void testEvaluatedAfterFirstSchedulePenalty() { - executor.schedule( - EasyMock.anyObject(), - EasyMock.eq(FIRST_SCHEDULE_DELAY_MS), - EasyMock.eq(MILLISECONDS)); - expectLastCall().andAnswer(new IAnswer>() { - @Override - public ScheduledFuture answer() { - ((Runnable) EasyMock.getCurrentArguments()[0]).run(); - return null; - } - }); - expect(rateLimiter.acquire()).andReturn(0D); - expect(taskScheduler.schedule("a")).andReturn(true); - - control.replay(); - - taskGroups.taskChangedState(TaskStateChange.transition(makeTask("a"), INIT)); - } - - private Capture expectEvaluate() { - Capture capture = createCapture(); - executor.schedule( - EasyMock.capture(capture), - EasyMock.eq(FIRST_SCHEDULE_DELAY_MS), - EasyMock.eq(MILLISECONDS)); - expectLastCall().andReturn(null); - return capture; - } - - @Test - public void testTaskDeletedBeforeEvaluating() { - final IScheduledTask task = makeTask("a"); - - Capture evaluate = expectEvaluate(); - - expect(rateLimiter.acquire()).andReturn(0D); - expect(taskScheduler.schedule(Tasks.id(task))).andAnswer(new IAnswer() { - @Override - public Boolean answer() { - // Test a corner case where a task is deleted while it is being evaluated by the task - // scheduler. If not handled carefully, this could result in the scheduler trying again - // later to satisfy the deleted task. - taskGroups.tasksDeleted(new TasksDeleted(ImmutableSet.of(task))); - - return false; - } - }); - expect(backoffStrategy.calculateBackoffMs(FIRST_SCHEDULE_DELAY_MS)).andReturn(0L); - - control.replay(); - - taskGroups.taskChangedState(TaskStateChange.transition(makeTask(Tasks.id(task)), INIT)); - evaluate.getValue().run(); - } - - private static IScheduledTask makeTask(String id) { - return IScheduledTask.build(new ScheduledTask() - .setStatus(ScheduleStatus.PENDING) - .setAssignedTask(new AssignedTask() - .setTaskId(id) - .setTask(new TaskConfig() - .setOwner(new Identity("owner", "owner")) - .setEnvironment("test") - .setJobName("job")))); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java deleted file mode 100644 index 6eaf3ce..0000000 --- a/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java +++ /dev/null @@ -1,398 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.twitter.common.base.Command; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.testing.easymock.EasyMockTest; -import com.twitter.common.util.testing.FakeClock; - -import org.apache.aurora.gen.AssignedTask; -import org.apache.aurora.gen.ExecutorConfig; -import org.apache.aurora.gen.Identity; -import org.apache.aurora.gen.JobKey; -import org.apache.aurora.gen.ScheduleStatus; -import org.apache.aurora.gen.ScheduledTask; -import org.apache.aurora.gen.TaskConfig; -import org.apache.aurora.gen.TaskEvent; -import org.apache.aurora.scheduler.async.TaskHistoryPruner.HistoryPrunnerSettings; -import org.apache.aurora.scheduler.base.Tasks; -import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; -import org.apache.aurora.scheduler.state.StateManager; -import org.apache.aurora.scheduler.storage.entities.IJobKey; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; -import org.easymock.Capture; -import org.easymock.EasyMock; -import org.easymock.IAnswer; -import org.junit.Before; -import org.junit.Test; - -import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED; -import static org.apache.aurora.gen.ScheduleStatus.FINISHED; -import static org.apache.aurora.gen.ScheduleStatus.KILLED; -import static org.apache.aurora.gen.ScheduleStatus.LOST; -import static org.apache.aurora.gen.ScheduleStatus.RUNNING; -import static org.apache.aurora.gen.ScheduleStatus.STARTING; -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.expectLastCall; -import static org.junit.Assert.fail; - -public class TaskHistoryPrunerTest extends EasyMockTest { - private static final String JOB_A = "job-a"; - private static final String TASK_ID = "task_id"; - private static final String SLAVE_HOST = "HOST_A"; - private static final Amount ONE_MS = Amount.of(1L, Time.MILLISECONDS); - private static final Amount ONE_MINUTE = Amount.of(1L, Time.MINUTES); - private static final Amount ONE_DAY = Amount.of(1L, Time.DAYS); - private static final Amount ONE_HOUR = Amount.of(1L, Time.HOURS); - private static final int PER_JOB_HISTORY = 2; - - private ScheduledFuture future; - private ScheduledExecutorService executor; - private FakeClock clock; - private StateManager stateManager; - private StorageTestUtil storageUtil; - private TaskHistoryPruner pruner; - - @Before - public void setUp() { - future = createMock(new Clazz>() { }); - executor = createMock(ScheduledExecutorService.class); - clock = new FakeClock(); - stateManager = createMock(StateManager.class); - storageUtil = new StorageTestUtil(this); - storageUtil.expectOperations(); - pruner = new TaskHistoryPruner( - executor, - stateManager, - clock, - new HistoryPrunnerSettings(ONE_DAY, ONE_MINUTE, PER_JOB_HISTORY), - storageUtil.storage); - } - - @Test - public void testNoPruning() { - long taskATimestamp = clock.nowMillis(); - IScheduledTask a = makeTask("a", FINISHED); - - clock.advance(ONE_MS); - long taskBTimestamp = clock.nowMillis(); - IScheduledTask b = makeTask("b", LOST); - - expectNoImmediatePrune(ImmutableSet.of(a)); - expectOneDelayedPrune(taskATimestamp); - expectNoImmediatePrune(ImmutableSet.of(a, b)); - expectOneDelayedPrune(taskBTimestamp); - - control.replay(); - - pruner.recordStateChange(TaskStateChange.initialized(a)); - pruner.recordStateChange(TaskStateChange.initialized(b)); - } - - @Test - public void testStorageStartedWithPruning() { - long taskATimestamp = clock.nowMillis(); - IScheduledTask a = makeTask("a", FINISHED); - - clock.advance(ONE_MINUTE); - long taskBTimestamp = clock.nowMillis(); - IScheduledTask b = makeTask("b", LOST); - - clock.advance(ONE_MINUTE); - long taskCTimestamp = clock.nowMillis(); - IScheduledTask c = makeTask("c", FINISHED); - - clock.advance(ONE_MINUTE); - IScheduledTask d = makeTask("d", FINISHED); - IScheduledTask e = makeTask("job-x", "e", FINISHED); - - expectNoImmediatePrune(ImmutableSet.of(a)); - expectOneDelayedPrune(taskATimestamp); - expectNoImmediatePrune(ImmutableSet.of(a, b)); - expectOneDelayedPrune(taskBTimestamp); - expectImmediatePrune(ImmutableSet.of(a, b, c), a); - expectOneDelayedPrune(taskCTimestamp); - expectImmediatePrune(ImmutableSet.of(b, c, d), b); - expectDefaultDelayedPrune(); - expectNoImmediatePrune(ImmutableSet.of(e)); - expectDefaultDelayedPrune(); - - control.replay(); - - for (IScheduledTask task : ImmutableList.of(a, b, c, d, e)) { - pruner.recordStateChange(TaskStateChange.initialized(task)); - } - } - - @Test - public void testStateChange() { - IScheduledTask starting = makeTask("a", STARTING); - IScheduledTask running = copy(starting, RUNNING); - IScheduledTask killed = copy(starting, KILLED); - - expectNoImmediatePrune(ImmutableSet.of(killed)); - expectDefaultDelayedPrune(); - - control.replay(); - - // No future set for non-terminal state transition. - changeState(starting, running); - - // Future set for terminal state transition. - changeState(running, killed); - } - - @Test - public void testActivateFutureAndExceedHistoryGoal() { - IScheduledTask running = makeTask("a", RUNNING); - IScheduledTask killed = copy(running, KILLED); - expectNoImmediatePrune(ImmutableSet.of(running)); - Capture delayedDelete = expectDefaultDelayedPrune(); - - // Expect task "a" to be pruned when future is activated. - expectDeleteTasks("a"); - - control.replay(); - - // Capture future for inactive task "a" - changeState(running, killed); - clock.advance(ONE_HOUR); - // Execute future to prune task "a" from the system. - delayedDelete.getValue().run(); - } - - @Test - public void testJobHistoryExceeded() { - IScheduledTask a = makeTask("a", RUNNING); - clock.advance(ONE_MS); - IScheduledTask aKilled = copy(a, KILLED); - - IScheduledTask b = makeTask("b", RUNNING); - clock.advance(ONE_MS); - IScheduledTask bKilled = copy(b, KILLED); - - IScheduledTask c = makeTask("c", RUNNING); - clock.advance(ONE_MS); - IScheduledTask cLost = copy(c, LOST); - - IScheduledTask d = makeTask("d", RUNNING); - clock.advance(ONE_MS); - IScheduledTask dLost = copy(d, LOST); - - expectNoImmediatePrune(ImmutableSet.of(a)); - expectDefaultDelayedPrune(); - expectNoImmediatePrune(ImmutableSet.of(a, b)); - expectDefaultDelayedPrune(); - expectNoImmediatePrune(ImmutableSet.of(a, b)); // no pruning yet due to min threshold - expectDefaultDelayedPrune(); - clock.advance(ONE_HOUR); - expectImmediatePrune(ImmutableSet.of(a, b, c, d), a, b); // now prune 2 tasks - expectDefaultDelayedPrune(); - - control.replay(); - - changeState(a, aKilled); - changeState(b, bKilled); - changeState(c, cLost); - changeState(d, dLost); - } - - // TODO(William Farner): Consider removing the thread safety tests. Now that intrinsic locks - // are not used, it is rather awkward to test this. - @Test - public void testThreadSafeStateChangeEvent() throws Exception { - // This tests against regression where an executor pruning a task holds an intrinsic lock and - // an unrelated task state change in the scheduler fires an event that requires this intrinsic - // lock. This causes a deadlock when the executor tries to acquire a lock held by the event - // fired. - - pruner = prunerWithRealExecutor(); - Command onDeleted = new Command() { - @Override - public void execute() { - // The goal is to verify that the call does not deadlock. We do not care about the outcome. - IScheduledTask b = makeTask("b", ASSIGNED); - - changeState(b, STARTING); - } - }; - CountDownLatch taskDeleted = expectTaskDeleted(onDeleted, TASK_ID); - - control.replay(); - - // Change the task to a terminal state and wait for it to be pruned. - changeState(makeTask(TASK_ID, RUNNING), KILLED); - taskDeleted.await(); - } - - private TaskHistoryPruner prunerWithRealExecutor() { - ScheduledExecutorService realExecutor = Executors.newScheduledThreadPool(1, - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("testThreadSafeEvents-executor") - .build()); - return new TaskHistoryPruner( - realExecutor, - stateManager, - clock, - new HistoryPrunnerSettings(Amount.of(1L, Time.MILLISECONDS), ONE_MS, PER_JOB_HISTORY), - storageUtil.storage); - } - - private CountDownLatch expectTaskDeleted(final Command onDelete, String taskId) { - final CountDownLatch deleteCalled = new CountDownLatch(1); - final CountDownLatch eventDelivered = new CountDownLatch(1); - - Thread eventDispatch = new Thread() { - @Override - public void run() { - try { - deleteCalled.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - fail("Interrupted while awaiting for delete call."); - return; - } - onDelete.execute(); - eventDelivered.countDown(); - } - }; - eventDispatch.setDaemon(true); - eventDispatch.setName(getClass().getName() + "-EventDispatch"); - eventDispatch.start(); - - stateManager.deleteTasks(storageUtil.mutableStoreProvider, ImmutableSet.of(taskId)); - expectLastCall().andAnswer(new IAnswer() { - @Override - public Void answer() { - deleteCalled.countDown(); - try { - eventDelivered.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - fail("Interrupted while awaiting for event delivery."); - } - return null; - } - }); - - return eventDelivered; - } - - private void expectDeleteTasks(String... tasks) { - stateManager.deleteTasks(storageUtil.mutableStoreProvider, ImmutableSet.copyOf(tasks)); - } - - private Capture expectDefaultDelayedPrune() { - return expectDelayedPrune(ONE_DAY.as(Time.MILLISECONDS), 1); - } - - private Capture expectOneDelayedPrune(long timestampMillis) { - return expectDelayedPrune(timestampMillis, 1); - } - - private void expectNoImmediatePrune(ImmutableSet tasksInJob) { - expectImmediatePrune(tasksInJob); - } - - private void expectImmediatePrune( - ImmutableSet tasksInJob, - IScheduledTask... pruned) { - - // Expect a deferred prune operation when a new task is being watched. - executor.submit(EasyMock.anyObject()); - expectLastCall().andAnswer( - new IAnswer>() { - @Override - public Future answer() { - Runnable work = (Runnable) EasyMock.getCurrentArguments()[0]; - work.run(); - return null; - } - } - ); - - IJobKey jobKey = Iterables.getOnlyElement( - FluentIterable.from(tasksInJob).transform(Tasks.SCHEDULED_TO_JOB_KEY).toSet()); - storageUtil.expectTaskFetch(TaskHistoryPruner.jobHistoryQuery(jobKey), tasksInJob); - if (pruned.length > 0) { - stateManager.deleteTasks(storageUtil.mutableStoreProvider, Tasks.ids(pruned)); - } - } - - private Capture expectDelayedPrune(long timestampMillis, int count) { - Capture capture = createCapture(); - executor.schedule( - EasyMock.capture(capture), - eq(pruner.calculateTimeout(timestampMillis)), - eq(TimeUnit.MILLISECONDS)); - expectLastCall().andReturn(future).times(count); - return capture; - } - - private void changeState(IScheduledTask oldStateTask, IScheduledTask newStateTask) { - pruner.recordStateChange(TaskStateChange.transition(newStateTask, oldStateTask.getStatus())); - } - - private void changeState(IScheduledTask oldStateTask, ScheduleStatus status) { - pruner.recordStateChange( - TaskStateChange.transition(copy(oldStateTask, status), oldStateTask.getStatus())); - } - - private IScheduledTask copy(IScheduledTask task, ScheduleStatus status) { - return IScheduledTask.build(task.newBuilder().setStatus(status)); - } - - private IScheduledTask makeTask( - String job, - String taskId, - ScheduleStatus status) { - - return IScheduledTask.build(new ScheduledTask() - .setStatus(status) - .setTaskEvents(ImmutableList.of(new TaskEvent(clock.nowMillis(), status))) - .setAssignedTask(makeAssignedTask(job, taskId))); - } - - private IScheduledTask makeTask(String taskId, ScheduleStatus status) { - return makeTask(JOB_A, taskId, status); - } - - private AssignedTask makeAssignedTask(String job, String taskId) { - return new AssignedTask() - .setSlaveHost(SLAVE_HOST) - .setTaskId(taskId) - .setTask(new TaskConfig() - .setJob(new JobKey("role", "staging45", job)) - .setOwner(new Identity().setRole("role").setUser("user")) - .setEnvironment("staging45") - .setJobName(job) - .setExecutorConfig(new ExecutorConfig("aurora", "config"))); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/TaskReconcilerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskReconcilerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskReconcilerTest.java deleted file mode 100644 index 0011412..0000000 --- a/src/test/java/org/apache/aurora/scheduler/async/TaskReconcilerTest.java +++ /dev/null @@ -1,140 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicLong; - -import com.google.common.collect.ImmutableSet; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.stats.StatsProvider; -import com.twitter.common.testing.easymock.EasyMockTest; - -import org.apache.aurora.scheduler.base.Query; -import org.apache.aurora.scheduler.base.TaskTestUtil; -import org.apache.aurora.scheduler.base.Tasks; -import org.apache.aurora.scheduler.mesos.Driver; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; -import org.apache.aurora.scheduler.testing.FakeScheduledExecutor; -import org.junit.Before; -import org.junit.Test; - -import static com.twitter.common.quantity.Time.MINUTES; - -import static org.apache.aurora.scheduler.async.TaskReconciler.EXPLICIT_STAT_NAME; -import static org.apache.aurora.scheduler.async.TaskReconciler.IMPLICIT_STAT_NAME; -import static org.apache.aurora.scheduler.async.TaskReconciler.TASK_TO_PROTO; -import static org.apache.aurora.scheduler.async.TaskReconciler.TaskReconcilerSettings; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.junit.Assert.assertEquals; - -public class TaskReconcilerTest extends EasyMockTest { - private static final Amount INITIAL_DELAY = Amount.of(10L, MINUTES); - private static final Amount EXPLICIT_SCHEDULE = Amount.of(60L, MINUTES); - private static final Amount IMPLICT_SCHEDULE = Amount.of(180L, MINUTES); - private static final Amount SPREAD = Amount.of(30L, MINUTES); - private static final TaskReconcilerSettings SETTINGS = new TaskReconcilerSettings( - INITIAL_DELAY, - EXPLICIT_SCHEDULE, - IMPLICT_SCHEDULE, - SPREAD); - - private StorageTestUtil storageUtil; - private StatsProvider statsProvider; - private Driver driver; - private ScheduledExecutorService executorService; - private FakeScheduledExecutor clock; - private AtomicLong explicitRuns; - private AtomicLong implicitRuns; - - @Before - public void setUp() { - storageUtil = new StorageTestUtil(this); - statsProvider = createMock(StatsProvider.class); - driver = createMock(Driver.class); - executorService = createMock(ScheduledExecutorService.class); - explicitRuns = new AtomicLong(); - implicitRuns = new AtomicLong(); - } - - @Test - public void testExecution() { - expect(statsProvider.makeCounter(EXPLICIT_STAT_NAME)).andReturn(explicitRuns); - expect(statsProvider.makeCounter(IMPLICIT_STAT_NAME)).andReturn(implicitRuns); - clock = FakeScheduledExecutor.scheduleAtFixedRateExecutor(executorService, 2, 5); - - IScheduledTask task = TaskTestUtil.makeTask("id1", TaskTestUtil.JOB); - storageUtil.expectOperations(); - storageUtil.expectTaskFetch(Query.unscoped().byStatus(Tasks.SLAVE_ASSIGNED_STATES), task) - .times(5); - - driver.reconcileTasks(ImmutableSet.of(TASK_TO_PROTO.apply(task))); - expectLastCall().times(5); - - driver.reconcileTasks(ImmutableSet.of()); - expectLastCall().times(2); - - control.replay(); - - TaskReconciler reconciler = new TaskReconciler( - SETTINGS, - storageUtil.storage, - driver, - executorService, - statsProvider); - - reconciler.startAsync().awaitRunning(); - - clock.advance(INITIAL_DELAY); - assertEquals(1L, explicitRuns.get()); - assertEquals(0L, implicitRuns.get()); - - clock.advance(SPREAD); - assertEquals(1L, explicitRuns.get()); - assertEquals(1L, implicitRuns.get()); - - clock.advance(EXPLICIT_SCHEDULE); - assertEquals(2L, explicitRuns.get()); - assertEquals(1L, implicitRuns.get()); - - clock.advance(IMPLICT_SCHEDULE); - assertEquals(5L, explicitRuns.get()); - assertEquals(2L, implicitRuns.get()); - } - - @Test(expected = IllegalArgumentException.class) - public void testInvalidImplicitDelay() throws Exception { - control.replay(); - - new TaskReconcilerSettings( - INITIAL_DELAY, - EXPLICIT_SCHEDULE, - IMPLICT_SCHEDULE, - Amount.of(Long.MAX_VALUE, MINUTES)); - } - - @Test(expected = IllegalArgumentException.class) - public void testInvalidExplicitDelay() throws Exception { - control.replay(); - - new TaskReconcilerSettings( - Amount.of(Long.MAX_VALUE, MINUTES), - EXPLICIT_SCHEDULE, - IMPLICT_SCHEDULE, - SPREAD); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java deleted file mode 100644 index 45adb2e..0000000 --- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java +++ /dev/null @@ -1,340 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableSet; -import com.google.inject.AbstractModule; -import com.google.inject.Guice; -import com.google.inject.Injector; -import com.google.inject.TypeLiteral; -import com.twitter.common.stats.StatsProvider; -import com.twitter.common.testing.easymock.EasyMockTest; -import com.twitter.common.util.Clock; - -import org.apache.aurora.gen.HostAttributes; -import org.apache.aurora.gen.MaintenanceMode; -import org.apache.aurora.gen.ScheduledTask; -import org.apache.aurora.scheduler.HostOffer; -import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl; -import org.apache.aurora.scheduler.async.preemptor.BiCache; -import org.apache.aurora.scheduler.async.preemptor.Preemptor; -import org.apache.aurora.scheduler.base.JobKeys; -import org.apache.aurora.scheduler.base.Query; -import org.apache.aurora.scheduler.base.TaskGroupKey; -import org.apache.aurora.scheduler.base.TaskTestUtil; -import org.apache.aurora.scheduler.base.Tasks; -import org.apache.aurora.scheduler.events.EventSink; -import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; -import org.apache.aurora.scheduler.events.PubsubEventModule; -import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; -import org.apache.aurora.scheduler.state.PubsubTestUtil; -import org.apache.aurora.scheduler.state.StateManager; -import org.apache.aurora.scheduler.state.TaskAssigner; -import org.apache.aurora.scheduler.state.TaskAssigner.Assignment; -import org.apache.aurora.scheduler.state.TaskAssigner.Assignment.Result; -import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; -import org.apache.aurora.scheduler.storage.Storage.MutateWork; -import org.apache.aurora.scheduler.storage.db.DbUtil; -import org.apache.aurora.scheduler.storage.entities.IHostAttributes; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; -import org.apache.aurora.scheduler.testing.FakeStatsProvider; -import org.apache.mesos.Protos.TaskInfo; -import org.easymock.Capture; -import org.easymock.EasyMock; -import org.easymock.IExpectationSetters; -import org.junit.Before; -import org.junit.Test; - -import static org.apache.aurora.gen.ScheduleStatus.PENDING; -import static org.apache.aurora.gen.ScheduleStatus.RUNNING; -import static org.apache.aurora.gen.ScheduleStatus.THROTTLED; -import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; -import static org.easymock.EasyMock.capture; -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.expect; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -public class TaskSchedulerImplTest extends EasyMockTest { - - private static final IScheduledTask TASK_A = - TaskTestUtil.makeTask("a", JobKeys.from("a", "a", "a")); - private static final IScheduledTask TASK_B = - TaskTestUtil.makeTask("b", JobKeys.from("b", "b", "b")); - private static final HostOffer OFFER = new HostOffer( - Offers.makeOffer("OFFER_A", "HOST_A"), - IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE))); - - private static final String SLAVE_ID = OFFER.getOffer().getSlaveId().getValue(); - - private static final TaskGroupKey GROUP_A = TaskGroupKey.from(TASK_A.getAssignedTask().getTask()); - private static final TaskGroupKey GROUP_B = TaskGroupKey.from(TASK_B.getAssignedTask().getTask()); - - private StorageTestUtil storageUtil; - private StateManager stateManager; - private TaskAssigner assigner; - private OfferManager offerManager; - private TaskScheduler scheduler; - private Preemptor preemptor; - private BiCache reservations; - private EventSink eventSink; - - @Before - public void setUp() throws Exception { - storageUtil = new StorageTestUtil(this); - stateManager = createMock(StateManager.class); - assigner = createMock(TaskAssigner.class); - offerManager = createMock(OfferManager.class); - preemptor = createMock(Preemptor.class); - reservations = createMock(new Clazz>() { }); - - Injector injector = getInjector(storageUtil.storage); - scheduler = injector.getInstance(TaskScheduler.class); - eventSink = PubsubTestUtil.startPubsub(injector); - } - - private Injector getInjector(final Storage storageImpl) { - return Guice.createInjector( - new PubsubEventModule(false), - new AbstractModule() { - @Override - protected void configure() { - bind(new TypeLiteral>() { }).toInstance(reservations); - bind(TaskScheduler.class).to(TaskSchedulerImpl.class); - bind(Preemptor.class).toInstance(preemptor); - bind(OfferManager.class).toInstance(offerManager); - bind(StateManager.class).toInstance(stateManager); - bind(TaskAssigner.class).toInstance(assigner); - bind(Clock.class).toInstance(createMock(Clock.class)); - bind(StatsProvider.class).toInstance(new FakeStatsProvider()); - bind(Storage.class).toInstance(storageImpl); - PubsubEventModule.bindSubscriber(binder(), TaskScheduler.class); - } - }); - } - - private void expectTaskStillPendingQuery(IScheduledTask task) { - storageUtil.expectTaskFetch( - Query.taskScoped(Tasks.id(task)).byStatus(PENDING), - ImmutableSet.of(task)); - } - - private void expectAssigned(IScheduledTask task) { - expect(assigner.maybeAssign( - storageUtil.mutableStoreProvider, - OFFER, - new ResourceRequest(task.getAssignedTask().getTask(), EMPTY), - Tasks.id(task))).andReturn(Assignment.success(TaskInfo.getDefaultInstance())); - } - - @Test - public void testReservation() throws Exception { - storageUtil.expectOperations(); - - expectTaskStillPendingQuery(TASK_A); - expectActiveJobFetch(TASK_A); - expectLaunchAttempt(false); - // Reserve "a" with offerA - expectReservationCheck(TASK_A); - expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID)); - expectAddReservation(SLAVE_ID, TASK_A); - - // Use previously created reservation. - expectTaskStillPendingQuery(TASK_A); - expectActiveJobFetch(TASK_A); - expectGetReservation(SLAVE_ID, TASK_A); - expectAssigned(TASK_A); - AssignmentCapture assignment = expectLaunchAttempt(true); - - control.replay(); - - assertFalse(scheduler.schedule("a")); - assertTrue(scheduler.schedule("a")); - assignAndAssert(Result.SUCCESS, GROUP_A, OFFER, assignment); - } - - @Test - public void testReservationExpires() throws Exception { - storageUtil.expectOperations(); - - expectTaskStillPendingQuery(TASK_A); - expectActiveJobFetch(TASK_A); - expectLaunchAttempt(false); - // Reserve "a" with offerA - expectReservationCheck(TASK_A); - expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID)); - expectAddReservation(SLAVE_ID, TASK_A); - - // First attempt -> reservation is active. - expectTaskStillPendingQuery(TASK_B); - expectActiveJobFetch(TASK_B); - AssignmentCapture firstAssignment = expectLaunchAttempt(false); - expectGetReservation(SLAVE_ID, TASK_A); - expectReservationCheck(TASK_B); - expectPreemptorCall(TASK_B, Optional.absent()); - - // Status changed -> reservation removed. - reservations.remove(SLAVE_ID, TaskGroupKey.from(TASK_A.getAssignedTask().getTask())); - - // Second attempt -> reservation expires. - expectGetNoReservation(SLAVE_ID); - expectTaskStillPendingQuery(TASK_B); - expectActiveJobFetch(TASK_B); - AssignmentCapture secondAssignment = expectLaunchAttempt(true); - expectAssigned(TASK_B); - - control.replay(); - - assertFalse(scheduler.schedule("a")); - assertFalse(scheduler.schedule("b")); - assignAndAssert(Result.FAILURE, GROUP_B, OFFER, firstAssignment); - - eventSink.post(TaskStateChange.transition(assign(TASK_A, SLAVE_ID), PENDING)); - assertTrue(scheduler.schedule("b")); - assignAndAssert(Result.SUCCESS, GROUP_B, OFFER, secondAssignment); - } - - @Test - public void testReservationUnusable() throws Exception { - storageUtil.expectOperations(); - - expectTaskStillPendingQuery(TASK_A); - expectLaunchAttempt(false); - expect(reservations.getByValue(TaskGroupKey.from(TASK_A.getAssignedTask().getTask()))) - .andReturn(ImmutableSet.of(SLAVE_ID)); - - control.replay(); - - assertFalse(scheduler.schedule("a")); - } - - @Test - public void testNonPendingIgnored() throws Exception { - control.replay(); - - eventSink.post(TaskStateChange.transition(TASK_A, RUNNING)); - } - - @Test - public void testPendingDeletedHandled() throws Exception { - control.replay(); - - IScheduledTask task = IScheduledTask.build(TASK_A.newBuilder().setStatus(PENDING)); - eventSink.post(TaskStateChange.transition(task, PENDING)); - } - - @Test - public void testIgnoresThrottledTasks() throws Exception { - // Ensures that tasks in THROTTLED state are not considered part of the active job state passed - // to the assigner function. - - Storage memStorage = DbUtil.createStorage(); - - Injector injector = getInjector(memStorage); - scheduler = injector.getInstance(TaskScheduler.class); - eventSink = PubsubTestUtil.startPubsub(injector); - - ScheduledTask builder = TASK_A.newBuilder(); - final IScheduledTask taskA = IScheduledTask.build(builder.setStatus(PENDING)); - builder.getAssignedTask().setTaskId("b"); - final IScheduledTask taskB = IScheduledTask.build(builder.setStatus(THROTTLED)); - - memStorage.write(new MutateWork.NoResult.Quiet() { - @Override - protected void execute(MutableStoreProvider store) { - store.getUnsafeTaskStore().saveTasks(ImmutableSet.of(taskA, taskB)); - } - }); - - expectGetNoReservation(SLAVE_ID); - AssignmentCapture assignment = expectLaunchAttempt(true); - expect(assigner.maybeAssign( - EasyMock.anyObject(), - eq(OFFER), - eq(new ResourceRequest(taskA.getAssignedTask().getTask(), EMPTY)), - eq(Tasks.id(taskA)))).andReturn(Assignment.success(TaskInfo.getDefaultInstance())); - - control.replay(); - - assertTrue(scheduler.schedule(Tasks.id(taskA))); - assignAndAssert(Result.SUCCESS, GROUP_A, OFFER, assignment); - } - - private static class AssignmentCapture { - public Capture> assigner = createCapture(); - public Capture groupKey = createCapture(); - } - - private void expectPreemptorCall(IScheduledTask task, Optional result) { - expect(preemptor.attemptPreemptionFor( - task.getAssignedTask(), - EMPTY, - storageUtil.mutableStoreProvider)).andReturn(result); - } - - private AssignmentCapture expectLaunchAttempt(boolean taskLaunched) - throws OfferManager.LaunchException { - - AssignmentCapture capture = new AssignmentCapture(); - expect(offerManager.launchFirst(capture(capture.assigner), capture(capture.groupKey))) - .andReturn(taskLaunched); - return capture; - } - - private IScheduledTask assign(IScheduledTask task, String slaveId) { - ScheduledTask result = task.newBuilder(); - result.getAssignedTask().setSlaveId(slaveId); - return IScheduledTask.build(result); - } - - private void assignAndAssert( - Result result, - TaskGroupKey groupKey, - HostOffer offer, - AssignmentCapture capture) { - - assertEquals(result, capture.assigner.getValue().apply(offer).getResult()); - assertEquals(groupKey, capture.groupKey.getValue()); - } - - private void expectActiveJobFetch(IScheduledTask task) { - storageUtil.expectTaskFetch( - Query.jobScoped(Tasks.SCHEDULED_TO_JOB_KEY.apply(task)) - .byStatus(Tasks.SLAVE_ASSIGNED_STATES), - ImmutableSet.of()); - } - - private void expectAddReservation(String slaveId, IScheduledTask task) { - reservations.put(slaveId, TaskGroupKey.from(task.getAssignedTask().getTask())); - } - - private IExpectationSetters expectGetReservation(String slaveId, IScheduledTask task) { - return expect(reservations.get(slaveId)) - .andReturn(Optional.of(TaskGroupKey.from(task.getAssignedTask().getTask()))); - } - - private IExpectationSetters expectGetNoReservation(String slaveId) { - return expect(reservations.get(slaveId)).andReturn(Optional.absent()); - } - - private IExpectationSetters expectReservationCheck(IScheduledTask task) { - return expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask()))) - .andReturn(ImmutableSet.of()); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java deleted file mode 100644 index ed15401..0000000 --- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java +++ /dev/null @@ -1,669 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import javax.annotation.Nullable; - -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.RateLimiter; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.testing.easymock.EasyMockTest; -import com.twitter.common.util.BackoffStrategy; - -import org.apache.aurora.gen.HostAttributes; -import org.apache.aurora.gen.MaintenanceMode; -import org.apache.aurora.gen.ScheduleStatus; -import org.apache.aurora.gen.ScheduledTask; -import org.apache.aurora.gen.TaskConfig; -import org.apache.aurora.scheduler.HostOffer; -import org.apache.aurora.scheduler.async.OfferManager.OfferManagerImpl; -import org.apache.aurora.scheduler.async.OfferManager.OfferReturnDelay; -import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl; -import org.apache.aurora.scheduler.async.preemptor.BiCache; -import org.apache.aurora.scheduler.async.preemptor.Preemptor; -import org.apache.aurora.scheduler.base.JobKeys; -import org.apache.aurora.scheduler.base.Query; -import org.apache.aurora.scheduler.base.TaskGroupKey; -import org.apache.aurora.scheduler.base.TaskTestUtil; -import org.apache.aurora.scheduler.base.Tasks; -import org.apache.aurora.scheduler.events.PubsubEvent; -import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; -import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted; -import org.apache.aurora.scheduler.filter.AttributeAggregate; -import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; -import org.apache.aurora.scheduler.mesos.Driver; -import org.apache.aurora.scheduler.state.MaintenanceController; -import org.apache.aurora.scheduler.state.StateChangeResult; -import org.apache.aurora.scheduler.state.StateManager; -import org.apache.aurora.scheduler.state.TaskAssigner; -import org.apache.aurora.scheduler.state.TaskAssigner.Assignment; -import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; -import org.apache.aurora.scheduler.storage.Storage.MutateWork; -import org.apache.aurora.scheduler.storage.Storage.StorageException; -import org.apache.aurora.scheduler.storage.TaskStore; -import org.apache.aurora.scheduler.storage.db.DbUtil; -import org.apache.aurora.scheduler.storage.entities.IAssignedTask; -import org.apache.aurora.scheduler.storage.entities.IHostAttributes; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.apache.aurora.scheduler.storage.entities.ITaskConfig; -import org.apache.mesos.Protos.SlaveID; -import org.apache.mesos.Protos.TaskID; -import org.apache.mesos.Protos.TaskInfo; -import org.easymock.Capture; -import org.easymock.EasyMock; -import org.easymock.IExpectationSetters; -import org.junit.Before; -import org.junit.Test; - -import static org.apache.aurora.gen.MaintenanceMode.DRAINED; -import static org.apache.aurora.gen.MaintenanceMode.DRAINING; -import static org.apache.aurora.gen.MaintenanceMode.NONE; -import static org.apache.aurora.gen.MaintenanceMode.SCHEDULED; -import static org.apache.aurora.gen.ScheduleStatus.FINISHED; -import static org.apache.aurora.gen.ScheduleStatus.INIT; -import static org.apache.aurora.gen.ScheduleStatus.KILLED; -import static org.apache.aurora.gen.ScheduleStatus.LOST; -import static org.apache.aurora.gen.ScheduleStatus.PENDING; -import static org.apache.aurora.gen.ScheduleStatus.RUNNING; -import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask; -import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; -import static org.apache.mesos.Protos.Offer; -import static org.easymock.EasyMock.capture; -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.easymock.EasyMock.isA; -import static org.junit.Assert.assertEquals; - -/** - * TODO(wfarner): Break this test up to independently test TaskSchedulerImpl and OfferQueueImpl. - */ -public class TaskSchedulerTest extends EasyMockTest { - - private static final long FIRST_SCHEDULE_DELAY_MS = 1L; - - private static final HostOffer OFFER_A = makeOffer("OFFER_A", "HOST_A", NONE); - private static final HostOffer OFFER_B = makeOffer("OFFER_B", "HOST_B", SCHEDULED); - private static final HostOffer OFFER_C = makeOffer("OFFER_C", "HOST_C", DRAINING); - private static final HostOffer OFFER_D = makeOffer("OFFER_D", "HOST_D", DRAINED); - private static final String SLAVE_A = OFFER_A.getOffer().getSlaveId().getValue(); - private static final String SLAVE_B = OFFER_B.getOffer().getSlaveId().getValue(); - private static final String SLAVE_C = OFFER_C.getOffer().getSlaveId().getValue(); - - private Storage storage; - - private MaintenanceController maintenance; - private StateManager stateManager; - private TaskAssigner assigner; - private BackoffStrategy retryStrategy; - private Driver driver; - private ScheduledExecutorService executor; - private ScheduledFuture future; - private OfferReturnDelay returnDelay; - private OfferManager offerManager; - private TaskGroups taskGroups; - private RescheduleCalculator rescheduleCalculator; - private Preemptor preemptor; - private BiCache reservations; - - @Before - public void setUp() { - storage = DbUtil.createStorage(); - maintenance = createMock(MaintenanceController.class); - stateManager = createMock(StateManager.class); - assigner = createMock(TaskAssigner.class); - retryStrategy = createMock(BackoffStrategy.class); - driver = createMock(Driver.class); - executor = createMock(ScheduledExecutorService.class); - future = createMock(ScheduledFuture.class); - returnDelay = createMock(OfferReturnDelay.class); - rescheduleCalculator = createMock(RescheduleCalculator.class); - preemptor = createMock(Preemptor.class); - reservations = createMock(new Clazz>() { }); - } - - private void replayAndCreateScheduler() { - control.replay(); - offerManager = new OfferManagerImpl(driver, returnDelay, executor); - TaskScheduler scheduler = new TaskSchedulerImpl(storage, - stateManager, - assigner, - offerManager, - preemptor, - reservations); - taskGroups = new TaskGroups( - executor, - Amount.of(FIRST_SCHEDULE_DELAY_MS, Time.MILLISECONDS), - retryStrategy, - RateLimiter.create(100), - scheduler, - rescheduleCalculator); - } - - private Capture expectOffer() { - return expectOfferDeclineIn(10); - } - - private Capture expectOfferDeclineIn(long delayMillis) { - expect(returnDelay.get()).andReturn(Amount.of(delayMillis, Time.MILLISECONDS)); - Capture runnable = createCapture(); - executor.schedule(capture(runnable), eq(delayMillis), eq(TimeUnit.MILLISECONDS)); - expectLastCall().andReturn(createMock(ScheduledFuture.class)); - return runnable; - } - - private void changeState( - IScheduledTask task, - ScheduleStatus oldState, - ScheduleStatus newState) { - - final IScheduledTask copy = IScheduledTask.build(task.newBuilder().setStatus(newState)); - // Insert the task if it doesn't already exist. - storage.write(new MutateWork.NoResult.Quiet() { - @Override - protected void execute(MutableStoreProvider storeProvider) { - TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore(); - if (Iterables.isEmpty(taskStore.fetchTasks(Query.taskScoped(Tasks.id(copy))))) { - taskStore.saveTasks(ImmutableSet.of(copy)); - } - } - }); - taskGroups.taskChangedState(TaskStateChange.transition(copy, oldState)); - } - - private Capture expectTaskRetryIn(long penaltyMs) { - Capture capture = createCapture(); - executor.schedule( - capture(capture), - eq(penaltyMs), - eq(TimeUnit.MILLISECONDS)); - expectLastCall().andReturn(future); - return capture; - } - - private Capture expectTaskGroupBackoff(long previousPenaltyMs, long nextPenaltyMs) { - expect(retryStrategy.calculateBackoffMs(previousPenaltyMs)).andReturn(nextPenaltyMs); - return expectTaskRetryIn(nextPenaltyMs); - } - - @Test - public void testNoTasks() { - expectAnyMaintenanceCalls(); - expectOfferDeclineIn(10); - expectOfferDeclineIn(10); - - replayAndCreateScheduler(); - - offerManager.addOffer(OFFER_A); - offerManager.addOffer(OFFER_B); - } - - @Test - public void testNoOffers() { - Capture timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10); - IScheduledTask task = createTask("a"); - expectPreemptorCall(task.getAssignedTask()); - expectReservationCheck(task); - - replayAndCreateScheduler(); - - changeState(task, INIT, PENDING); - timeoutCapture.getValue().run(); - } - - private IScheduledTask createTask(String taskId) { - return createTask(taskId, null); - } - - private IScheduledTask createTask(String taskId, @Nullable ScheduleStatus status) { - return setStatus(makeTask(taskId, TaskTestUtil.JOB), status); - } - - private IScheduledTask setStatus(IScheduledTask task, @Nullable ScheduleStatus status) { - return IScheduledTask.build(task.newBuilder().setStatus(status)); - } - - @Test - public void testLoadFromStorage() { - final IScheduledTask a = createTask("a", KILLED); - final IScheduledTask b = createTask("b", PENDING); - final IScheduledTask c = createTask("c", RUNNING); - - expect(rescheduleCalculator.getStartupScheduleDelayMs(b)).andReturn(10L); - expectTaskRetryIn(10); - - replayAndCreateScheduler(); - - storage.write(new MutateWork.NoResult.Quiet() { - @Override - protected void execute(MutableStoreProvider store) { - store.getUnsafeTaskStore().saveTasks(ImmutableSet.of(a, b, c)); - } - }); - for (IScheduledTask task : ImmutableList.of(a, b, c)) { - taskGroups.taskChangedState(TaskStateChange.initialized(task)); - } - changeState(c, RUNNING, FINISHED); - } - - @Test - public void testTaskMissing() { - Capture timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - - replayAndCreateScheduler(); - - taskGroups.taskChangedState(TaskStateChange.transition(createTask("a", PENDING), INIT)); - timeoutCapture.getValue().run(); - } - - private IExpectationSetters expectMaybeAssign( - HostOffer offer, - IScheduledTask task, - AttributeAggregate jobAggregate) { - - return expect(assigner.maybeAssign( - EasyMock.anyObject(), - eq(offer), - eq(new ResourceRequest(task.getAssignedTask().getTask(), jobAggregate)), - eq(Tasks.id(task)))); - } - - private IExpectationSetters expectNoReservation(String slaveId) { - return expect(reservations.get(slaveId)).andReturn(Optional.absent()); - } - - private IExpectationSetters expectReservationCheck(IScheduledTask task) { - return expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask()))) - .andReturn(ImmutableSet.of()); - } - - @Test - public void testTaskAssigned() { - expectAnyMaintenanceCalls(); - expectOfferDeclineIn(10); - - IScheduledTask taskA = createTask("a", PENDING); - TaskInfo mesosTask = makeTaskInfo(taskA); - - Capture timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - expectNoReservation(SLAVE_A).times(2); - expectReservationCheck(taskA); - expectMaybeAssign(OFFER_A, taskA, EMPTY).andReturn(Assignment.failure()); - expectPreemptorCall(taskA.getAssignedTask()); - - Capture timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10); - expectMaybeAssign(OFFER_A, taskA, EMPTY).andReturn(Assignment.success(mesosTask)); - driver.launchTask(OFFER_A.getOffer().getId(), mesosTask); - - Capture timeoutCapture3 = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10); - IScheduledTask taskB = createTask("b"); - expectReservationCheck(taskB); - expectPreemptorCall(taskB.getAssignedTask()); - - replayAndCreateScheduler(); - - offerManager.addOffer(OFFER_A); - changeState(taskA, INIT, PENDING); - timeoutCapture.getValue().run(); - timeoutCapture2.getValue().run(); - - // Ensure the offer was consumed. - changeState(taskB, INIT, PENDING); - timeoutCapture3.getValue().run(); - } - - @Test - public void testDriverNotReady() { - IScheduledTask task = createTask("a", PENDING); - TaskInfo mesosTask = TaskInfo.newBuilder() - .setName(Tasks.id(task)) - .setTaskId(TaskID.newBuilder().setValue(Tasks.id(task))) - .setSlaveId(SlaveID.newBuilder().setValue("slaveId")) - .build(); - - Capture timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - expectAnyMaintenanceCalls(); - expectOfferDeclineIn(10); - expectNoReservation(SLAVE_A); - expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.success(mesosTask)); - driver.launchTask(OFFER_A.getOffer().getId(), mesosTask); - expectLastCall().andThrow(new IllegalStateException("Driver not ready.")); - expect(stateManager.changeState( - EasyMock.anyObject(), - eq("a"), - eq(Optional.of(PENDING)), - eq(LOST), - eq(TaskSchedulerImpl.LAUNCH_FAILED_MSG))) - .andReturn(StateChangeResult.SUCCESS); - - replayAndCreateScheduler(); - - changeState(task, INIT, PENDING); - offerManager.addOffer(OFFER_A); - timeoutCapture.getValue().run(); - } - - @Test - public void testStorageException() { - IScheduledTask task = createTask("a", PENDING); - TaskInfo mesosTask = TaskInfo.newBuilder() - .setName(Tasks.id(task)) - .setTaskId(TaskID.newBuilder().setValue(Tasks.id(task))) - .setSlaveId(SlaveID.newBuilder().setValue("slaveId")) - .build(); - - Capture timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - expectAnyMaintenanceCalls(); - expectOfferDeclineIn(10); - expectNoReservation(SLAVE_A).times(2); - expectMaybeAssign(OFFER_A, task, EMPTY).andThrow(new StorageException("Injected failure.")); - - Capture timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10); - expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.success(mesosTask)); - driver.launchTask(OFFER_A.getOffer().getId(), mesosTask); - expectLastCall(); - - replayAndCreateScheduler(); - - changeState(task, INIT, PENDING); - offerManager.addOffer(OFFER_A); - timeoutCapture.getValue().run(); - timeoutCapture2.getValue().run(); - } - - @Test - public void testExpiration() { - IScheduledTask task = createTask("a", PENDING); - - Capture timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - Capture offerExpirationCapture = expectOfferDeclineIn(10); - expectAnyMaintenanceCalls(); - expectNoReservation(SLAVE_A); - expectReservationCheck(task).times(2); - expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.failure()); - Capture timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10); - expectPreemptorCall(task.getAssignedTask()); - driver.declineOffer(OFFER_A.getOffer().getId()); - expectTaskGroupBackoff(10, 20); - expectPreemptorCall(task.getAssignedTask()); - - replayAndCreateScheduler(); - - changeState(task, INIT, PENDING); - offerManager.addOffer(OFFER_A); - timeoutCapture.getValue().run(); - offerExpirationCapture.getValue().run(); - timeoutCapture2.getValue().run(); - } - - @Test - public void testOneOfferPerSlave() { - expectAnyMaintenanceCalls(); - Capture offerExpirationCapture = expectOfferDeclineIn(10); - - HostOffer offerAB = new HostOffer( - Offers.makeOffer("OFFER_B").toBuilder().setSlaveId(OFFER_A.getOffer().getSlaveId()).build(), - IHostAttributes.build(new HostAttributes())); - - driver.declineOffer(OFFER_A.getOffer().getId()); - driver.declineOffer(offerAB.getOffer().getId()); - - replayAndCreateScheduler(); - - offerManager.addOffer(OFFER_A); - offerManager.addOffer(offerAB); - offerExpirationCapture.getValue().run(); - } - - @Test - public void testDontDeclineAcceptedOffer() throws OfferManager.LaunchException { - expectAnyMaintenanceCalls(); - Capture offerExpirationCapture = expectOfferDeclineIn(10); - - Function offerAcceptor = - createMock(new Clazz>() { }); - final TaskInfo taskInfo = TaskInfo.getDefaultInstance(); - expect(offerAcceptor.apply(OFFER_A)).andReturn(Assignment.success(taskInfo)); - driver.launchTask(OFFER_A.getOffer().getId(), taskInfo); - - replayAndCreateScheduler(); - - offerManager.addOffer(OFFER_A); - offerManager.launchFirst(offerAcceptor, TaskGroupKey.from(ITaskConfig.build(new TaskConfig()))); - offerExpirationCapture.getValue().run(); - } - - @Test - public void testBasicMaintenancePreferences() { - expectOffer(); - expectOffer(); - expectOffer(); - expectOffer(); - - IScheduledTask taskA = createTask("A", PENDING); - TaskInfo mesosTaskA = makeTaskInfo(taskA); - expectNoReservation(SLAVE_A); - expectMaybeAssign(OFFER_A, taskA, EMPTY).andReturn(Assignment.success(mesosTaskA)); - driver.launchTask(OFFER_A.getOffer().getId(), mesosTaskA); - Capture captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - - IScheduledTask taskB = createTask("B", PENDING); - TaskInfo mesosTaskB = makeTaskInfo(taskB); - expectNoReservation(SLAVE_B); - expectMaybeAssign(OFFER_B, taskB, EMPTY).andReturn(Assignment.success(mesosTaskB)); - driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskB); - Capture captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - - replayAndCreateScheduler(); - - offerManager.addOffer(OFFER_D); - offerManager.addOffer(OFFER_C); - offerManager.addOffer(OFFER_B); - offerManager.addOffer(OFFER_A); - - changeState(taskA, INIT, PENDING); - captureA.getValue().run(); - - changeState(taskB, INIT, PENDING); - captureB.getValue().run(); - } - - @Test - public void testChangingMaintenancePreferences() { - expectOffer(); - expectOffer(); - expectOffer(); - - IScheduledTask taskA = createTask("A", PENDING); - TaskInfo mesosTaskA = makeTaskInfo(taskA); - expectNoReservation(SLAVE_B); - expectMaybeAssign(OFFER_B, taskA, EMPTY).andReturn(Assignment.success(mesosTaskA)); - driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskA); - Capture captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - - IScheduledTask taskB = createTask("B", PENDING); - TaskInfo mesosTaskB = makeTaskInfo(taskB); - HostOffer updatedOfferC = new HostOffer( - OFFER_C.getOffer(), - IHostAttributes.build(OFFER_C.getAttributes().newBuilder().setMode(NONE))); - expectNoReservation(SLAVE_C); - expectMaybeAssign(updatedOfferC, taskB, EMPTY).andReturn(Assignment.success(mesosTaskB)); - driver.launchTask(OFFER_C.getOffer().getId(), mesosTaskB); - Capture captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - - replayAndCreateScheduler(); - - offerManager.addOffer(OFFER_A); - offerManager.addOffer(OFFER_B); - offerManager.addOffer(OFFER_C); - - // Initially, we'd expect the offers to be consumed in order (A, B), with (C) unschedulable - - // Expected order now (B), with (C, A) unschedulable - changeHostMaintenanceState(OFFER_A.getAttributes(), DRAINING); - changeState(taskA, INIT, PENDING); - captureA.getValue().run(); - - // Expected order now (C), with (A) unschedulable and (B) already consumed - changeHostMaintenanceState(OFFER_C.getAttributes(), NONE); - changeState(taskB, INIT, PENDING); - captureB.getValue().run(); - } - - private Capture expectTaskScheduled(IScheduledTask task) { - TaskInfo mesosTask = makeTaskInfo(task); - Capture taskId = createCapture(); - expect(assigner.maybeAssign( - EasyMock.anyObject(), - EasyMock.anyObject(), - EasyMock.anyObject(), - capture(taskId))).andReturn(Assignment.success(mesosTask)); - driver.launchTask(EasyMock.anyObject(), eq(mesosTask)); - return taskId; - } - - @Test - public void testResistsStarvation() { - // TODO(wfarner): This test requires intimate knowledge of the way futures are used inside - // TaskScheduler. It's time to test using a real ScheduledExecutorService. - - expectAnyMaintenanceCalls(); - - IScheduledTask jobA0 = setStatus(makeTask("a0", JobKeys.from("a", "b", "c")), PENDING); - - ScheduledTask jobA1Builder = jobA0.newBuilder(); - jobA1Builder.getAssignedTask().setTaskId("a1"); - jobA1Builder.getAssignedTask().setInstanceId(1); - IScheduledTask jobA1 = IScheduledTask.build(jobA1Builder); - - ScheduledTask jobA2Builder = jobA0.newBuilder(); - jobA2Builder.getAssignedTask().setTaskId("a2"); - jobA2Builder.getAssignedTask().setInstanceId(2); - IScheduledTask jobA2 = IScheduledTask.build(jobA2Builder); - - IScheduledTask jobB0 = setStatus(makeTask("b0", JobKeys.from("d", "e", "f")), PENDING); - - expectNoReservation(SLAVE_A); - expectNoReservation(SLAVE_B); - - expectOfferDeclineIn(10); - expectOfferDeclineIn(10); - expectOfferDeclineIn(10); - expectOfferDeclineIn(10); - - Capture timeoutA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - Capture timeoutB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - - Capture firstScheduled = expectTaskScheduled(jobA0); - Capture secondScheduled = expectTaskScheduled(jobB0); - - // Expect another watch of the task group for job A. - expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - - replayAndCreateScheduler(); - - offerManager.addOffer(OFFER_A); - offerManager.addOffer(OFFER_B); - offerManager.addOffer(OFFER_C); - offerManager.addOffer(OFFER_D); - changeState(jobA0, INIT, PENDING); - changeState(jobA1, INIT, PENDING); - changeState(jobA2, INIT, PENDING); - changeState(jobB0, INIT, PENDING); - timeoutA.getValue().run(); - timeoutB.getValue().run(); - assertEquals( - ImmutableSet.of(Tasks.id(jobA0), Tasks.id(jobB0)), - ImmutableSet.of(firstScheduled.getValue(), secondScheduled.getValue())); - } - - @Test - public void testTaskDeleted() { - expectAnyMaintenanceCalls(); - expectOfferDeclineIn(10); - - final IScheduledTask task = createTask("a", PENDING); - - Capture timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - expectNoReservation(SLAVE_A); - expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.failure()); - expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 20); - expectReservationCheck(task); - expectPreemptorCall(task.getAssignedTask()); - - replayAndCreateScheduler(); - - offerManager.addOffer(OFFER_A); - changeState(task, INIT, PENDING); - timeoutCapture.getValue().run(); - - // Ensure the offer was consumed. - changeState(task, INIT, PENDING); - storage.write(new MutateWork.NoResult.Quiet() { - @Override - protected void execute(MutableStoreProvider storeProvider) { - storeProvider.getUnsafeTaskStore().deleteTasks(Tasks.ids(task)); - } - }); - taskGroups.tasksDeleted(new TasksDeleted(ImmutableSet.of(task))); - timeoutCapture.getValue().run(); - } - - private TaskInfo makeTaskInfo(IScheduledTask task) { - return TaskInfo.newBuilder() - .setName(Tasks.id(task)) - .setTaskId(TaskID.newBuilder().setValue(Tasks.id(task))) - .setSlaveId(SlaveID.newBuilder().setValue("slave-id" + task.toString())) - .build(); - } - - private void expectAnyMaintenanceCalls() { - expect(maintenance.getMode(isA(String.class))).andReturn(NONE).anyTimes(); - } - - private void changeHostMaintenanceState(IHostAttributes attributes, MaintenanceMode mode) { - offerManager.hostAttributesChanged(new PubsubEvent.HostAttributesChanged( - IHostAttributes.build(attributes.newBuilder().setMode(mode)))); - } - - private static HostOffer makeOffer(String offerId, String hostName, MaintenanceMode mode) { - Offer offer = Offers.makeOffer(offerId, hostName); - return new HostOffer( - offer, - IHostAttributes.build(new HostAttributes() - .setHost(hostName) - .setSlaveId(offer.getSlaveId().getValue()) - .setAttributes(ImmutableSet.of()) - .setMode(mode))); - } - - private void expectPreemptorCall(IAssignedTask task) { - expect(preemptor.attemptPreemptionFor( - eq(task), - eq(EMPTY), - EasyMock.anyObject())).andReturn(Optional.absent()); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java deleted file mode 100644 index cfa9d81..0000000 --- a/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableList; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.testing.easymock.EasyMockTest; -import com.twitter.common.util.testing.FakeClock; - -import org.apache.aurora.gen.AssignedTask; -import org.apache.aurora.gen.ScheduleStatus; -import org.apache.aurora.gen.ScheduledTask; -import org.apache.aurora.gen.TaskEvent; -import org.apache.aurora.scheduler.base.Tasks; -import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; -import org.apache.aurora.scheduler.state.StateChangeResult; -import org.apache.aurora.scheduler.state.StateManager; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; -import org.easymock.Capture; -import org.junit.Before; -import org.junit.Test; - -import static org.apache.aurora.gen.ScheduleStatus.INIT; -import static org.apache.aurora.gen.ScheduleStatus.PENDING; -import static org.apache.aurora.gen.ScheduleStatus.RUNNING; -import static org.apache.aurora.gen.ScheduleStatus.THROTTLED; -import static org.easymock.EasyMock.capture; -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.expect; - -public class TaskThrottlerTest extends EasyMockTest { - - private RescheduleCalculator rescheduleCalculator; - private FakeClock clock; - private ScheduledExecutorService executor; - private StorageTestUtil storageUtil; - private StateManager stateManager; - private TaskThrottler throttler; - - @Before - public void setUp() throws Exception { - rescheduleCalculator = createMock(RescheduleCalculator.class); - clock = new FakeClock(); - executor = createMock(ScheduledExecutorService.class); - storageUtil = new StorageTestUtil(this); - storageUtil.expectOperations(); - stateManager = createMock(StateManager.class); - throttler = new TaskThrottler( - rescheduleCalculator, - clock, - executor, - storageUtil.storage, - stateManager); - } - - @Test - public void testIgnoresNonThrottledTasks() { - control.replay(); - - throttler.taskChangedState(TaskStateChange.transition(makeTask("a", PENDING), INIT)); - throttler.taskChangedState(TaskStateChange.transition(makeTask("a", RUNNING), PENDING)); - } - - @Test - public void testThrottledTask() { - IScheduledTask task = makeTask("a", THROTTLED); - - long penaltyMs = 100; - - expect(rescheduleCalculator.getFlappingPenaltyMs(task)).andReturn(penaltyMs); - Capture stateChangeCapture = expectThrottled(penaltyMs); - expectMovedToPending(task); - - control.replay(); - - throttler.taskChangedState(TaskStateChange.transition(task, INIT)); - stateChangeCapture.getValue().run(); - } - - @Test - public void testThrottledTaskReady() { - // Ensures that a sane delay is used when the task's penalty was already expired when - // the -> THROTTLED transition occurred (such as in the event of a scheduler failover). - - IScheduledTask task = makeTask("a", THROTTLED); - - long penaltyMs = 100; - - expect(rescheduleCalculator.getFlappingPenaltyMs(task)).andReturn(penaltyMs); - Capture stateChangeCapture = expectThrottled(0); - expectMovedToPending(task); - - control.replay(); - - clock.advance(Amount.of(1L, Time.HOURS)); - throttler.taskChangedState(TaskStateChange.transition(task, INIT)); - stateChangeCapture.getValue().run(); - } - - private Capture expectThrottled(long penaltyMs) { - Capture stateChangeCapture = createCapture(); - expect(executor.schedule( - capture(stateChangeCapture), - eq(penaltyMs), - eq(TimeUnit.MILLISECONDS))) - .andReturn(null); - return stateChangeCapture; - } - - private void expectMovedToPending(IScheduledTask task) { - expect(stateManager.changeState( - storageUtil.mutableStoreProvider, - Tasks.id(task), - Optional.of(THROTTLED), - PENDING, - Optional.absent())) - .andReturn(StateChangeResult.SUCCESS); - } - - private IScheduledTask makeTask(String id, ScheduleStatus status) { - return IScheduledTask.build(new ScheduledTask() - .setTaskEvents(ImmutableList.of( - new TaskEvent() - .setStatus(status) - .setTimestamp(clock.nowMillis()))) - .setStatus(status) - .setAssignedTask(new AssignedTask().setTaskId(id))); - } -}