Return-Path: X-Original-To: apmail-aurora-commits-archive@minotaur.apache.org Delivered-To: apmail-aurora-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0BD93173CC for ; Wed, 22 Jul 2015 19:40:02 +0000 (UTC) Received: (qmail 75897 invoked by uid 500); 22 Jul 2015 19:39:59 -0000 Delivered-To: apmail-aurora-commits-archive@aurora.apache.org Received: (qmail 75862 invoked by uid 500); 22 Jul 2015 19:39:58 -0000 Mailing-List: contact commits-help@aurora.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aurora.apache.org Delivered-To: mailing list commits@aurora.apache.org Received: (qmail 75840 invoked by uid 99); 22 Jul 2015 19:39:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Jul 2015 19:39:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B3F19DFC90; Wed, 22 Jul 2015 19:39:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wfarner@apache.org To: commits@aurora.apache.org Date: Wed, 22 Jul 2015 19:39:58 -0000 Message-Id: <76b9bd048c524f7194b93975aab8959e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/8] aurora git commit: Break apart async package and AsyncModule into purpose-specific equivalents. Repository: aurora Updated Branches: refs/heads/master 6e2bf577f -> 0070a5fd1 http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java new file mode 100644 index 0000000..97d25f9 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java @@ -0,0 +1,244 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.reconciliation; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.stats.StatsProvider; +import com.twitter.common.testing.easymock.EasyMockTest; +import com.twitter.common.util.testing.FakeClock; + +import org.apache.aurora.gen.AssignedTask; +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.gen.TaskConfig; +import org.apache.aurora.gen.TaskEvent; +import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; +import org.apache.aurora.scheduler.state.StateChangeResult; +import org.apache.aurora.scheduler.state.StateManager; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED; +import static org.apache.aurora.gen.ScheduleStatus.FINISHED; +import static org.apache.aurora.gen.ScheduleStatus.INIT; +import static org.apache.aurora.gen.ScheduleStatus.KILLED; +import static org.apache.aurora.gen.ScheduleStatus.KILLING; +import static org.apache.aurora.gen.ScheduleStatus.LOST; +import static org.apache.aurora.gen.ScheduleStatus.PENDING; +import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING; +import static org.apache.aurora.gen.ScheduleStatus.RUNNING; +import static org.apache.aurora.gen.ScheduleStatus.STARTING; +import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.junit.Assert.assertEquals; + +public class TaskTimeoutTest extends EasyMockTest { + + private static final String TASK_ID = "task_id"; + private static final Amount TIMEOUT = Amount.of(1L, Time.MINUTES); + + private AtomicLong timedOutTaskCounter; + private ScheduledExecutorService executor; + private StorageTestUtil storageUtil; + private ScheduledFuture future; + private StateManager stateManager; + private FakeClock clock; + private TaskTimeout timeout; + private StatsProvider statsProvider; + + @Before + public void setUp() { + executor = createMock(ScheduledExecutorService.class); + storageUtil = new StorageTestUtil(this); + storageUtil.expectOperations(); + future = createMock(new Clazz>() { }); + stateManager = createMock(StateManager.class); + clock = new FakeClock(); + statsProvider = createMock(StatsProvider.class); + timedOutTaskCounter = new AtomicLong(); + expect(statsProvider.makeCounter(TaskTimeout.TIMED_OUT_TASKS_COUNTER)) + .andReturn(timedOutTaskCounter); + } + + private void replayAndCreate() { + control.replay(); + timeout = new TaskTimeout( + executor, + storageUtil.storage, + stateManager, + TIMEOUT, + statsProvider); + timeout.startAsync().awaitRunning(); + } + + private Capture expectTaskWatch(Amount expireIn) { + Capture capture = createCapture(); + executor.schedule( + EasyMock.capture(capture), + eq((long) expireIn.getValue()), + eq(expireIn.getUnit().getTimeUnit())); + expectLastCall().andReturn(future); + return capture; + } + + private Capture expectTaskWatch() { + return expectTaskWatch(TIMEOUT); + } + + private void changeState(String taskId, ScheduleStatus from, ScheduleStatus to) { + IScheduledTask task = IScheduledTask.build(new ScheduledTask() + .setStatus(to) + .setAssignedTask(new AssignedTask().setTaskId(taskId))); + timeout.recordStateChange(TaskStateChange.transition(task, from)); + } + + private void changeState(ScheduleStatus from, ScheduleStatus to) { + changeState(TASK_ID, from, to); + } + + @Test + public void testNormalTransitions() { + expectTaskWatch(); + expectTaskWatch(); + + replayAndCreate(); + + changeState(INIT, PENDING); + changeState(PENDING, ASSIGNED); + changeState(ASSIGNED, STARTING); + changeState(STARTING, RUNNING); + changeState(RUNNING, KILLING); + changeState(KILLING, KILLED); + } + + @Test + public void testTransientToTransient() { + expectTaskWatch(); + Capture killingTimeout = expectTaskWatch(); + expect(stateManager.changeState( + storageUtil.mutableStoreProvider, + TASK_ID, + Optional.of(KILLING), + LOST, + TaskTimeout.TIMEOUT_MESSAGE)) + .andReturn(StateChangeResult.SUCCESS); + + replayAndCreate(); + + changeState(PENDING, ASSIGNED); + changeState(ASSIGNED, KILLING); + killingTimeout.getValue().run(); + } + + @Test + public void testTimeout() throws Exception { + Capture assignedTimeout = expectTaskWatch(); + expect(stateManager.changeState( + storageUtil.mutableStoreProvider, + TASK_ID, + Optional.of(ASSIGNED), + LOST, + TaskTimeout.TIMEOUT_MESSAGE)) + .andReturn(StateChangeResult.SUCCESS); + + replayAndCreate(); + + changeState(INIT, PENDING); + changeState(PENDING, ASSIGNED); + assignedTimeout.getValue().run(); + assertEquals(timedOutTaskCounter.intValue(), 1); + } + + @Test + public void testTaskDeleted() throws Exception { + Capture assignedTimeout = expectTaskWatch(); + expect(stateManager.changeState( + storageUtil.mutableStoreProvider, + TASK_ID, + Optional.of(KILLING), + LOST, + TaskTimeout.TIMEOUT_MESSAGE)) + .andReturn(StateChangeResult.ILLEGAL); + + replayAndCreate(); + + changeState(INIT, PENDING); + changeState(PENDING, KILLING); + assignedTimeout.getValue().run(); + assertEquals(timedOutTaskCounter.intValue(), 0); + } + + private static IScheduledTask makeTask( + String taskId, + ScheduleStatus status, + long stateEnteredMs) { + + return IScheduledTask.build(new ScheduledTask() + .setStatus(status) + .setTaskEvents(ImmutableList.of(new TaskEvent(stateEnteredMs, status))) + .setAssignedTask(new AssignedTask() + .setTaskId(taskId) + .setTask(new TaskConfig()))); + } + + @Test + public void testStorageStart() { + expectTaskWatch(TIMEOUT); + expectTaskWatch(TIMEOUT); + expectTaskWatch(TIMEOUT); + + replayAndCreate(); + + clock.setNowMillis(TIMEOUT.as(Time.MILLISECONDS) * 2); + for (IScheduledTask task : ImmutableList.of( + makeTask("a", ASSIGNED, 0), + makeTask("b", KILLING, TIMEOUT.as(Time.MILLISECONDS)), + makeTask("c", PREEMPTING, clock.nowMillis() + TIMEOUT.as(Time.MILLISECONDS)))) { + + timeout.recordStateChange(TaskStateChange.initialized(task)); + } + + changeState("a", ASSIGNED, RUNNING); + changeState("b", KILLING, KILLED); + changeState("c", PREEMPTING, FINISHED); + } + + @Test + public void testTimeoutWhileNotStarted() throws Exception { + // Since the timeout is never instructed to start, it should not attempt to transition tasks, + // but it should try again later. + Capture assignedTimeout = expectTaskWatch(); + expectTaskWatch(TaskTimeout.NOT_STARTED_RETRY); + + control.replay(); + timeout = new TaskTimeout(executor, storageUtil.storage, stateManager, TIMEOUT, statsProvider); + + changeState(INIT, PENDING); + changeState(PENDING, ASSIGNED); + assignedTimeout.getValue().run(); + assertEquals(timedOutTaskCounter.intValue(), 0); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculatorImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculatorImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculatorImplTest.java new file mode 100644 index 0000000..7936b29 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculatorImplTest.java @@ -0,0 +1,188 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.scheduling; + +import java.util.Map; +import java.util.Map.Entry; + +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableMap; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.testing.easymock.EasyMockTest; +import com.twitter.common.util.BackoffStrategy; + +import org.apache.aurora.gen.AssignedTask; +import org.apache.aurora.gen.Identity; +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.gen.TaskConfig; +import org.apache.aurora.gen.TaskEvent; +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.scheduling.RescheduleCalculator.RescheduleCalculatorImpl; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.aurora.gen.ScheduleStatus.FINISHED; +import static org.apache.aurora.gen.ScheduleStatus.INIT; +import static org.apache.aurora.gen.ScheduleStatus.KILLED; +import static org.apache.aurora.gen.ScheduleStatus.KILLING; +import static org.apache.aurora.gen.ScheduleStatus.PENDING; +import static org.apache.aurora.gen.ScheduleStatus.RUNNING; +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; + +public class RescheduleCalculatorImplTest extends EasyMockTest { + + private static final Amount FLAPPING_THRESHOLD = Amount.of(1L, Time.MINUTES); + private static final Amount MAX_STARTUP_DELAY = Amount.of(10, Time.MINUTES); + + private StorageTestUtil storageUtil; + private BackoffStrategy backoff; + private RescheduleCalculator rescheduleCalculator; + + @Before + public void setUp() { + storageUtil = new StorageTestUtil(this); + backoff = createMock(BackoffStrategy.class); + rescheduleCalculator = new RescheduleCalculatorImpl( + storageUtil.storage, + new RescheduleCalculatorImpl.RescheduleCalculatorSettings( + backoff, + FLAPPING_THRESHOLD, + MAX_STARTUP_DELAY)); + storageUtil.expectOperations(); + } + + @Test + public void testNoPenaltyForNoAncestor() { + control.replay(); + + assertEquals(0L, rescheduleCalculator.getFlappingPenaltyMs(makeTask("a", INIT))); + } + + @Test + public void testNoPenaltyDeletedAncestor() { + String ancestorId = "a"; + storageUtil.expectTaskFetch(Query.taskScoped(ancestorId)); + + control.replay(); + + assertEquals( + 0L, + rescheduleCalculator.getFlappingPenaltyMs(setAncestor(makeTask("b", INIT), ancestorId))); + } + + @Test + public void testFlappingTask() { + IScheduledTask ancestor = makeFlappyTask("a"); + storageUtil.expectTaskFetch(Query.taskScoped(Tasks.id(ancestor)), ancestor); + long penaltyMs = 1000L; + expect(backoff.calculateBackoffMs(0L)).andReturn(penaltyMs); + + control.replay(); + + assertEquals( + penaltyMs, + rescheduleCalculator.getFlappingPenaltyMs( + setAncestor(makeTask("b", INIT), Tasks.id(ancestor)))); + } + + @Test + public void testFlappingTasksBackoffTruncation() { + // Ensures that the reschedule calculator detects penalty truncation and avoids inspecting + // ancestors once truncated. + IScheduledTask taskA = setAncestor(makeFlappyTask("a"), "bugIfQueried"); + IScheduledTask taskB = setAncestor(makeFlappyTask("b"), Tasks.id(taskA)); + IScheduledTask taskC = setAncestor(makeFlappyTask("c"), Tasks.id(taskB)); + IScheduledTask taskD = setAncestor(makeFlappyTask("d"), Tasks.id(taskC)); + + Map ancestorsAndPenalties = ImmutableMap.of( + taskD, 100L, + taskC, 200L, + taskB, 300L, + taskA, 300L); + + long lastPenalty = 0L; + for (Map.Entry taskAndPenalty : ancestorsAndPenalties.entrySet()) { + storageUtil.expectTaskFetch( + Query.taskScoped(Tasks.id(taskAndPenalty.getKey())), + taskAndPenalty.getKey()); + expect(backoff.calculateBackoffMs(lastPenalty)).andReturn(taskAndPenalty.getValue()); + lastPenalty = taskAndPenalty.getValue(); + } + + control.replay(); + + IScheduledTask newTask = setAncestor(makeFlappyTask("newTask"), Tasks.id(taskD)); + assertEquals(300L, rescheduleCalculator.getFlappingPenaltyMs(newTask)); + } + + @Test + public void testNoPenaltyForInterruptedTasks() { + IScheduledTask ancestor = setEvents( + makeTask("a", KILLED), + ImmutableMap.of(INIT, 0L, PENDING, 100L, RUNNING, 200L, KILLING, 300L, KILLED, 400L)); + storageUtil.expectTaskFetch(Query.taskScoped(Tasks.id(ancestor)), ancestor); + + control.replay(); + + assertEquals( + 0L, + rescheduleCalculator.getFlappingPenaltyMs( + setAncestor(makeTask("b", INIT), Tasks.id(ancestor)))); + } + + private IScheduledTask makeFlappyTask(String taskId) { + return setEvents( + makeTask(taskId, FINISHED), + ImmutableMap.of(INIT, 0L, PENDING, 100L, RUNNING, 200L, FINISHED, 300L)); + } + + private IScheduledTask makeTask(String taskId) { + return IScheduledTask.build(new ScheduledTask() + .setAssignedTask(new AssignedTask() + .setInstanceId(0) + .setTaskId(taskId) + .setTask(new TaskConfig() + .setJobName("job-" + taskId) + .setOwner(new Identity().setRole("role-" + taskId).setUser("user-" + taskId)) + .setEnvironment("env-" + taskId)))); + } + + private IScheduledTask makeTask(String taskId, ScheduleStatus status) { + return IScheduledTask.build(makeTask(taskId).newBuilder().setStatus(status)); + } + + private IScheduledTask setAncestor(IScheduledTask task, String ancestorId) { + return IScheduledTask.build(task.newBuilder().setAncestorId(ancestorId)); + } + + private static final Function, TaskEvent> TO_EVENT = + new Function, TaskEvent>() { + @Override + public TaskEvent apply(Entry input) { + return new TaskEvent().setStatus(input.getKey()).setTimestamp(input.getValue()); + } + }; + + private IScheduledTask setEvents(IScheduledTask task, Map events) { + return IScheduledTask.build(task.newBuilder().setTaskEvents( + FluentIterable.from(events.entrySet()).transform(TO_EVENT).toList())); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java new file mode 100644 index 0000000..10de372 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java @@ -0,0 +1,140 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.scheduling; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; + +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.RateLimiter; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.testing.easymock.EasyMockTest; +import com.twitter.common.util.BackoffStrategy; + +import org.apache.aurora.gen.AssignedTask; +import org.apache.aurora.gen.Identity; +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.gen.TaskConfig; +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; +import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.junit.Before; +import org.junit.Test; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import static org.apache.aurora.gen.ScheduleStatus.INIT; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; + +public class TaskGroupsTest extends EasyMockTest { + + private static final long FIRST_SCHEDULE_DELAY_MS = 1L; + + private ScheduledExecutorService executor; + private BackoffStrategy backoffStrategy; + private TaskScheduler taskScheduler; + private RateLimiter rateLimiter; + + private TaskGroups taskGroups; + + @Before + public void setUp() throws Exception { + executor = createMock(ScheduledExecutorService.class); + backoffStrategy = createMock(BackoffStrategy.class); + taskScheduler = createMock(TaskScheduler.class); + rateLimiter = createMock(RateLimiter.class); + taskGroups = new TaskGroups( + executor, + Amount.of(FIRST_SCHEDULE_DELAY_MS, Time.MILLISECONDS), + backoffStrategy, + rateLimiter, + taskScheduler, + createMock(RescheduleCalculator.class)); + } + + @Test + public void testEvaluatedAfterFirstSchedulePenalty() { + executor.schedule( + EasyMock.anyObject(), + EasyMock.eq(FIRST_SCHEDULE_DELAY_MS), + EasyMock.eq(MILLISECONDS)); + expectLastCall().andAnswer(new IAnswer>() { + @Override + public ScheduledFuture answer() { + ((Runnable) EasyMock.getCurrentArguments()[0]).run(); + return null; + } + }); + expect(rateLimiter.acquire()).andReturn(0D); + expect(taskScheduler.schedule("a")).andReturn(true); + + control.replay(); + + taskGroups.taskChangedState(TaskStateChange.transition(makeTask("a"), INIT)); + } + + private Capture expectEvaluate() { + Capture capture = createCapture(); + executor.schedule( + EasyMock.capture(capture), + EasyMock.eq(FIRST_SCHEDULE_DELAY_MS), + EasyMock.eq(MILLISECONDS)); + expectLastCall().andReturn(null); + return capture; + } + + @Test + public void testTaskDeletedBeforeEvaluating() { + final IScheduledTask task = makeTask("a"); + + Capture evaluate = expectEvaluate(); + + expect(rateLimiter.acquire()).andReturn(0D); + expect(taskScheduler.schedule(Tasks.id(task))).andAnswer(new IAnswer() { + @Override + public Boolean answer() { + // Test a corner case where a task is deleted while it is being evaluated by the task + // scheduler. If not handled carefully, this could result in the scheduler trying again + // later to satisfy the deleted task. + taskGroups.tasksDeleted(new TasksDeleted(ImmutableSet.of(task))); + + return false; + } + }); + expect(backoffStrategy.calculateBackoffMs(FIRST_SCHEDULE_DELAY_MS)).andReturn(0L); + + control.replay(); + + taskGroups.taskChangedState(TaskStateChange.transition(makeTask(Tasks.id(task)), INIT)); + evaluate.getValue().run(); + } + + private static IScheduledTask makeTask(String id) { + return IScheduledTask.build(new ScheduledTask() + .setStatus(ScheduleStatus.PENDING) + .setAssignedTask(new AssignedTask() + .setTaskId(id) + .setTask(new TaskConfig() + .setOwner(new Identity("owner", "owner")) + .setEnvironment("test") + .setJobName("job")))); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java new file mode 100644 index 0000000..25d8da6 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java @@ -0,0 +1,342 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.scheduling; + +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableSet; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.TypeLiteral; +import com.twitter.common.stats.StatsProvider; +import com.twitter.common.testing.easymock.EasyMockTest; +import com.twitter.common.util.Clock; + +import org.apache.aurora.gen.HostAttributes; +import org.apache.aurora.gen.MaintenanceMode; +import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.base.JobKeys; +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.base.TaskGroupKey; +import org.apache.aurora.scheduler.base.TaskTestUtil; +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.events.EventSink; +import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; +import org.apache.aurora.scheduler.events.PubsubEventModule; +import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; +import org.apache.aurora.scheduler.offers.OfferManager; +import org.apache.aurora.scheduler.offers.Offers; +import org.apache.aurora.scheduler.preemptor.BiCache; +import org.apache.aurora.scheduler.preemptor.Preemptor; +import org.apache.aurora.scheduler.scheduling.TaskScheduler.TaskSchedulerImpl; +import org.apache.aurora.scheduler.state.PubsubTestUtil; +import org.apache.aurora.scheduler.state.StateManager; +import org.apache.aurora.scheduler.state.TaskAssigner; +import org.apache.aurora.scheduler.state.TaskAssigner.Assignment; +import org.apache.aurora.scheduler.state.TaskAssigner.Assignment.Result; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; +import org.apache.aurora.scheduler.storage.Storage.MutateWork; +import org.apache.aurora.scheduler.storage.db.DbUtil; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.apache.mesos.Protos.TaskInfo; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.easymock.IExpectationSetters; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.aurora.gen.ScheduleStatus.PENDING; +import static org.apache.aurora.gen.ScheduleStatus.RUNNING; +import static org.apache.aurora.gen.ScheduleStatus.THROTTLED; +import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TaskSchedulerImplTest extends EasyMockTest { + + private static final IScheduledTask TASK_A = + TaskTestUtil.makeTask("a", JobKeys.from("a", "a", "a")); + private static final IScheduledTask TASK_B = + TaskTestUtil.makeTask("b", JobKeys.from("b", "b", "b")); + private static final HostOffer OFFER = new HostOffer( + Offers.makeOffer("OFFER_A", "HOST_A"), + IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE))); + + private static final String SLAVE_ID = OFFER.getOffer().getSlaveId().getValue(); + + private static final TaskGroupKey GROUP_A = TaskGroupKey.from(TASK_A.getAssignedTask().getTask()); + private static final TaskGroupKey GROUP_B = TaskGroupKey.from(TASK_B.getAssignedTask().getTask()); + + private StorageTestUtil storageUtil; + private StateManager stateManager; + private TaskAssigner assigner; + private OfferManager offerManager; + private TaskScheduler scheduler; + private Preemptor preemptor; + private BiCache reservations; + private EventSink eventSink; + + @Before + public void setUp() throws Exception { + storageUtil = new StorageTestUtil(this); + stateManager = createMock(StateManager.class); + assigner = createMock(TaskAssigner.class); + offerManager = createMock(OfferManager.class); + preemptor = createMock(Preemptor.class); + reservations = createMock(new Clazz>() { }); + + Injector injector = getInjector(storageUtil.storage); + scheduler = injector.getInstance(TaskScheduler.class); + eventSink = PubsubTestUtil.startPubsub(injector); + } + + private Injector getInjector(final Storage storageImpl) { + return Guice.createInjector( + new PubsubEventModule(false), + new AbstractModule() { + @Override + protected void configure() { + bind(new TypeLiteral>() { }).toInstance(reservations); + bind(TaskScheduler.class).to(TaskSchedulerImpl.class); + bind(Preemptor.class).toInstance(preemptor); + bind(OfferManager.class).toInstance(offerManager); + bind(StateManager.class).toInstance(stateManager); + bind(TaskAssigner.class).toInstance(assigner); + bind(Clock.class).toInstance(createMock(Clock.class)); + bind(StatsProvider.class).toInstance(new FakeStatsProvider()); + bind(Storage.class).toInstance(storageImpl); + PubsubEventModule.bindSubscriber(binder(), TaskScheduler.class); + } + }); + } + + private void expectTaskStillPendingQuery(IScheduledTask task) { + storageUtil.expectTaskFetch( + Query.taskScoped(Tasks.id(task)).byStatus(PENDING), + ImmutableSet.of(task)); + } + + private void expectAssigned(IScheduledTask task) { + expect(assigner.maybeAssign( + storageUtil.mutableStoreProvider, + OFFER, + new ResourceRequest(task.getAssignedTask().getTask(), EMPTY), + Tasks.id(task))).andReturn(Assignment.success(TaskInfo.getDefaultInstance())); + } + + @Test + public void testReservation() throws Exception { + storageUtil.expectOperations(); + + expectTaskStillPendingQuery(TASK_A); + expectActiveJobFetch(TASK_A); + expectLaunchAttempt(false); + // Reserve "a" with offerA + expectReservationCheck(TASK_A); + expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID)); + expectAddReservation(SLAVE_ID, TASK_A); + + // Use previously created reservation. + expectTaskStillPendingQuery(TASK_A); + expectActiveJobFetch(TASK_A); + expectGetReservation(SLAVE_ID, TASK_A); + expectAssigned(TASK_A); + AssignmentCapture assignment = expectLaunchAttempt(true); + + control.replay(); + + assertFalse(scheduler.schedule("a")); + assertTrue(scheduler.schedule("a")); + assignAndAssert(Result.SUCCESS, GROUP_A, OFFER, assignment); + } + + @Test + public void testReservationExpires() throws Exception { + storageUtil.expectOperations(); + + expectTaskStillPendingQuery(TASK_A); + expectActiveJobFetch(TASK_A); + expectLaunchAttempt(false); + // Reserve "a" with offerA + expectReservationCheck(TASK_A); + expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID)); + expectAddReservation(SLAVE_ID, TASK_A); + + // First attempt -> reservation is active. + expectTaskStillPendingQuery(TASK_B); + expectActiveJobFetch(TASK_B); + AssignmentCapture firstAssignment = expectLaunchAttempt(false); + expectGetReservation(SLAVE_ID, TASK_A); + expectReservationCheck(TASK_B); + expectPreemptorCall(TASK_B, Optional.absent()); + + // Status changed -> reservation removed. + reservations.remove(SLAVE_ID, TaskGroupKey.from(TASK_A.getAssignedTask().getTask())); + + // Second attempt -> reservation expires. + expectGetNoReservation(SLAVE_ID); + expectTaskStillPendingQuery(TASK_B); + expectActiveJobFetch(TASK_B); + AssignmentCapture secondAssignment = expectLaunchAttempt(true); + expectAssigned(TASK_B); + + control.replay(); + + assertFalse(scheduler.schedule("a")); + assertFalse(scheduler.schedule("b")); + assignAndAssert(Result.FAILURE, GROUP_B, OFFER, firstAssignment); + + eventSink.post(TaskStateChange.transition(assign(TASK_A, SLAVE_ID), PENDING)); + assertTrue(scheduler.schedule("b")); + assignAndAssert(Result.SUCCESS, GROUP_B, OFFER, secondAssignment); + } + + @Test + public void testReservationUnusable() throws Exception { + storageUtil.expectOperations(); + + expectTaskStillPendingQuery(TASK_A); + expectLaunchAttempt(false); + expect(reservations.getByValue(TaskGroupKey.from(TASK_A.getAssignedTask().getTask()))) + .andReturn(ImmutableSet.of(SLAVE_ID)); + + control.replay(); + + assertFalse(scheduler.schedule("a")); + } + + @Test + public void testNonPendingIgnored() throws Exception { + control.replay(); + + eventSink.post(TaskStateChange.transition(TASK_A, RUNNING)); + } + + @Test + public void testPendingDeletedHandled() throws Exception { + control.replay(); + + IScheduledTask task = IScheduledTask.build(TASK_A.newBuilder().setStatus(PENDING)); + eventSink.post(TaskStateChange.transition(task, PENDING)); + } + + @Test + public void testIgnoresThrottledTasks() throws Exception { + // Ensures that tasks in THROTTLED state are not considered part of the active job state passed + // to the assigner function. + + Storage memStorage = DbUtil.createStorage(); + + Injector injector = getInjector(memStorage); + scheduler = injector.getInstance(TaskScheduler.class); + eventSink = PubsubTestUtil.startPubsub(injector); + + ScheduledTask builder = TASK_A.newBuilder(); + final IScheduledTask taskA = IScheduledTask.build(builder.setStatus(PENDING)); + builder.getAssignedTask().setTaskId("b"); + final IScheduledTask taskB = IScheduledTask.build(builder.setStatus(THROTTLED)); + + memStorage.write(new MutateWork.NoResult.Quiet() { + @Override + protected void execute(MutableStoreProvider store) { + store.getUnsafeTaskStore().saveTasks(ImmutableSet.of(taskA, taskB)); + } + }); + + expectGetNoReservation(SLAVE_ID); + AssignmentCapture assignment = expectLaunchAttempt(true); + expect(assigner.maybeAssign( + EasyMock.anyObject(), + eq(OFFER), + eq(new ResourceRequest(taskA.getAssignedTask().getTask(), EMPTY)), + eq(Tasks.id(taskA)))).andReturn(Assignment.success(TaskInfo.getDefaultInstance())); + + control.replay(); + + assertTrue(scheduler.schedule(Tasks.id(taskA))); + assignAndAssert(Result.SUCCESS, GROUP_A, OFFER, assignment); + } + + private static class AssignmentCapture { + public Capture> assigner = createCapture(); + public Capture groupKey = createCapture(); + } + + private void expectPreemptorCall(IScheduledTask task, Optional result) { + expect(preemptor.attemptPreemptionFor( + task.getAssignedTask(), + EMPTY, + storageUtil.mutableStoreProvider)).andReturn(result); + } + + private AssignmentCapture expectLaunchAttempt(boolean taskLaunched) + throws OfferManager.LaunchException { + + AssignmentCapture capture = new AssignmentCapture(); + expect(offerManager.launchFirst(capture(capture.assigner), capture(capture.groupKey))) + .andReturn(taskLaunched); + return capture; + } + + private IScheduledTask assign(IScheduledTask task, String slaveId) { + ScheduledTask result = task.newBuilder(); + result.getAssignedTask().setSlaveId(slaveId); + return IScheduledTask.build(result); + } + + private void assignAndAssert( + Result result, + TaskGroupKey groupKey, + HostOffer offer, + AssignmentCapture capture) { + + assertEquals(result, capture.assigner.getValue().apply(offer).getResult()); + assertEquals(groupKey, capture.groupKey.getValue()); + } + + private void expectActiveJobFetch(IScheduledTask task) { + storageUtil.expectTaskFetch( + Query.jobScoped(Tasks.SCHEDULED_TO_JOB_KEY.apply(task)) + .byStatus(Tasks.SLAVE_ASSIGNED_STATES), + ImmutableSet.of()); + } + + private void expectAddReservation(String slaveId, IScheduledTask task) { + reservations.put(slaveId, TaskGroupKey.from(task.getAssignedTask().getTask())); + } + + private IExpectationSetters expectGetReservation(String slaveId, IScheduledTask task) { + return expect(reservations.get(slaveId)) + .andReturn(Optional.of(TaskGroupKey.from(task.getAssignedTask().getTask()))); + } + + private IExpectationSetters expectGetNoReservation(String slaveId) { + return expect(reservations.get(slaveId)).andReturn(Optional.absent()); + } + + private IExpectationSetters expectReservationCheck(IScheduledTask task) { + return expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask()))) + .andReturn(ImmutableSet.of()); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerTest.java new file mode 100644 index 0000000..52af364 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerTest.java @@ -0,0 +1,671 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.scheduling; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import javax.annotation.Nullable; + +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.RateLimiter; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.testing.easymock.EasyMockTest; +import com.twitter.common.util.BackoffStrategy; + +import org.apache.aurora.gen.HostAttributes; +import org.apache.aurora.gen.MaintenanceMode; +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.gen.TaskConfig; +import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.base.JobKeys; +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.base.TaskGroupKey; +import org.apache.aurora.scheduler.base.TaskTestUtil; +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.events.PubsubEvent; +import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; +import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted; +import org.apache.aurora.scheduler.filter.AttributeAggregate; +import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; +import org.apache.aurora.scheduler.mesos.Driver; +import org.apache.aurora.scheduler.offers.OfferManager; +import org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl; +import org.apache.aurora.scheduler.offers.OfferManager.OfferReturnDelay; +import org.apache.aurora.scheduler.offers.Offers; +import org.apache.aurora.scheduler.preemptor.BiCache; +import org.apache.aurora.scheduler.preemptor.Preemptor; +import org.apache.aurora.scheduler.scheduling.TaskScheduler.TaskSchedulerImpl; +import org.apache.aurora.scheduler.state.MaintenanceController; +import org.apache.aurora.scheduler.state.StateChangeResult; +import org.apache.aurora.scheduler.state.StateManager; +import org.apache.aurora.scheduler.state.TaskAssigner; +import org.apache.aurora.scheduler.state.TaskAssigner.Assignment; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; +import org.apache.aurora.scheduler.storage.Storage.MutateWork; +import org.apache.aurora.scheduler.storage.Storage.StorageException; +import org.apache.aurora.scheduler.storage.TaskStore; +import org.apache.aurora.scheduler.storage.db.DbUtil; +import org.apache.aurora.scheduler.storage.entities.IAssignedTask; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import org.apache.mesos.Protos.SlaveID; +import org.apache.mesos.Protos.TaskID; +import org.apache.mesos.Protos.TaskInfo; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.easymock.IExpectationSetters; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.aurora.gen.MaintenanceMode.DRAINED; +import static org.apache.aurora.gen.MaintenanceMode.DRAINING; +import static org.apache.aurora.gen.MaintenanceMode.NONE; +import static org.apache.aurora.gen.MaintenanceMode.SCHEDULED; +import static org.apache.aurora.gen.ScheduleStatus.FINISHED; +import static org.apache.aurora.gen.ScheduleStatus.INIT; +import static org.apache.aurora.gen.ScheduleStatus.KILLED; +import static org.apache.aurora.gen.ScheduleStatus.LOST; +import static org.apache.aurora.gen.ScheduleStatus.PENDING; +import static org.apache.aurora.gen.ScheduleStatus.RUNNING; +import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask; +import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; +import static org.apache.mesos.Protos.Offer; +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.isA; +import static org.junit.Assert.assertEquals; + +/** + * TODO(wfarner): Break this test up to independently test TaskSchedulerImpl and OfferQueueImpl. + */ +public class TaskSchedulerTest extends EasyMockTest { + + private static final long FIRST_SCHEDULE_DELAY_MS = 1L; + + private static final HostOffer OFFER_A = makeOffer("OFFER_A", "HOST_A", NONE); + private static final HostOffer OFFER_B = makeOffer("OFFER_B", "HOST_B", SCHEDULED); + private static final HostOffer OFFER_C = makeOffer("OFFER_C", "HOST_C", DRAINING); + private static final HostOffer OFFER_D = makeOffer("OFFER_D", "HOST_D", DRAINED); + private static final String SLAVE_A = OFFER_A.getOffer().getSlaveId().getValue(); + private static final String SLAVE_B = OFFER_B.getOffer().getSlaveId().getValue(); + private static final String SLAVE_C = OFFER_C.getOffer().getSlaveId().getValue(); + + private Storage storage; + + private MaintenanceController maintenance; + private StateManager stateManager; + private TaskAssigner assigner; + private BackoffStrategy retryStrategy; + private Driver driver; + private ScheduledExecutorService executor; + private ScheduledFuture future; + private OfferReturnDelay returnDelay; + private OfferManager offerManager; + private TaskGroups taskGroups; + private RescheduleCalculator rescheduleCalculator; + private Preemptor preemptor; + private BiCache reservations; + + @Before + public void setUp() { + storage = DbUtil.createStorage(); + maintenance = createMock(MaintenanceController.class); + stateManager = createMock(StateManager.class); + assigner = createMock(TaskAssigner.class); + retryStrategy = createMock(BackoffStrategy.class); + driver = createMock(Driver.class); + executor = createMock(ScheduledExecutorService.class); + future = createMock(ScheduledFuture.class); + returnDelay = createMock(OfferReturnDelay.class); + rescheduleCalculator = createMock(RescheduleCalculator.class); + preemptor = createMock(Preemptor.class); + reservations = createMock(new Clazz>() { }); + } + + private void replayAndCreateScheduler() { + control.replay(); + offerManager = new OfferManagerImpl(driver, returnDelay, executor); + TaskScheduler scheduler = new TaskSchedulerImpl(storage, + stateManager, + assigner, + offerManager, + preemptor, + reservations); + taskGroups = new TaskGroups( + executor, + Amount.of(FIRST_SCHEDULE_DELAY_MS, Time.MILLISECONDS), + retryStrategy, + RateLimiter.create(100), + scheduler, + rescheduleCalculator); + } + + private Capture expectOffer() { + return expectOfferDeclineIn(10); + } + + private Capture expectOfferDeclineIn(long delayMillis) { + expect(returnDelay.get()).andReturn(Amount.of(delayMillis, Time.MILLISECONDS)); + Capture runnable = createCapture(); + executor.schedule(capture(runnable), eq(delayMillis), eq(TimeUnit.MILLISECONDS)); + expectLastCall().andReturn(createMock(ScheduledFuture.class)); + return runnable; + } + + private void changeState( + IScheduledTask task, + ScheduleStatus oldState, + ScheduleStatus newState) { + + final IScheduledTask copy = IScheduledTask.build(task.newBuilder().setStatus(newState)); + // Insert the task if it doesn't already exist. + storage.write(new MutateWork.NoResult.Quiet() { + @Override + protected void execute(MutableStoreProvider storeProvider) { + TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore(); + if (Iterables.isEmpty(taskStore.fetchTasks(Query.taskScoped(Tasks.id(copy))))) { + taskStore.saveTasks(ImmutableSet.of(copy)); + } + } + }); + taskGroups.taskChangedState(TaskStateChange.transition(copy, oldState)); + } + + private Capture expectTaskRetryIn(long penaltyMs) { + Capture capture = createCapture(); + executor.schedule( + capture(capture), + eq(penaltyMs), + eq(TimeUnit.MILLISECONDS)); + expectLastCall().andReturn(future); + return capture; + } + + private Capture expectTaskGroupBackoff(long previousPenaltyMs, long nextPenaltyMs) { + expect(retryStrategy.calculateBackoffMs(previousPenaltyMs)).andReturn(nextPenaltyMs); + return expectTaskRetryIn(nextPenaltyMs); + } + + @Test + public void testNoTasks() { + expectAnyMaintenanceCalls(); + expectOfferDeclineIn(10); + expectOfferDeclineIn(10); + + replayAndCreateScheduler(); + + offerManager.addOffer(OFFER_A); + offerManager.addOffer(OFFER_B); + } + + @Test + public void testNoOffers() { + Capture timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); + expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10); + IScheduledTask task = createTask("a"); + expectPreemptorCall(task.getAssignedTask()); + expectReservationCheck(task); + + replayAndCreateScheduler(); + + changeState(task, INIT, PENDING); + timeoutCapture.getValue().run(); + } + + private IScheduledTask createTask(String taskId) { + return createTask(taskId, null); + } + + private IScheduledTask createTask(String taskId, @Nullable ScheduleStatus status) { + return setStatus(makeTask(taskId, TaskTestUtil.JOB), status); + } + + private IScheduledTask setStatus(IScheduledTask task, @Nullable ScheduleStatus status) { + return IScheduledTask.build(task.newBuilder().setStatus(status)); + } + + @Test + public void testLoadFromStorage() { + final IScheduledTask a = createTask("a", KILLED); + final IScheduledTask b = createTask("b", PENDING); + final IScheduledTask c = createTask("c", RUNNING); + + expect(rescheduleCalculator.getStartupScheduleDelayMs(b)).andReturn(10L); + expectTaskRetryIn(10); + + replayAndCreateScheduler(); + + storage.write(new MutateWork.NoResult.Quiet() { + @Override + protected void execute(MutableStoreProvider store) { + store.getUnsafeTaskStore().saveTasks(ImmutableSet.of(a, b, c)); + } + }); + for (IScheduledTask task : ImmutableList.of(a, b, c)) { + taskGroups.taskChangedState(TaskStateChange.initialized(task)); + } + changeState(c, RUNNING, FINISHED); + } + + @Test + public void testTaskMissing() { + Capture timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); + + replayAndCreateScheduler(); + + taskGroups.taskChangedState(TaskStateChange.transition(createTask("a", PENDING), INIT)); + timeoutCapture.getValue().run(); + } + + private IExpectationSetters expectMaybeAssign( + HostOffer offer, + IScheduledTask task, + AttributeAggregate jobAggregate) { + + return expect(assigner.maybeAssign( + EasyMock.anyObject(), + eq(offer), + eq(new ResourceRequest(task.getAssignedTask().getTask(), jobAggregate)), + eq(Tasks.id(task)))); + } + + private IExpectationSetters expectNoReservation(String slaveId) { + return expect(reservations.get(slaveId)).andReturn(Optional.absent()); + } + + private IExpectationSetters expectReservationCheck(IScheduledTask task) { + return expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask()))) + .andReturn(ImmutableSet.of()); + } + + @Test + public void testTaskAssigned() { + expectAnyMaintenanceCalls(); + expectOfferDeclineIn(10); + + IScheduledTask taskA = createTask("a", PENDING); + TaskInfo mesosTask = makeTaskInfo(taskA); + + Capture timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); + expectNoReservation(SLAVE_A).times(2); + expectReservationCheck(taskA); + expectMaybeAssign(OFFER_A, taskA, EMPTY).andReturn(Assignment.failure()); + expectPreemptorCall(taskA.getAssignedTask()); + + Capture timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10); + expectMaybeAssign(OFFER_A, taskA, EMPTY).andReturn(Assignment.success(mesosTask)); + driver.launchTask(OFFER_A.getOffer().getId(), mesosTask); + + Capture timeoutCapture3 = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); + expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10); + IScheduledTask taskB = createTask("b"); + expectReservationCheck(taskB); + expectPreemptorCall(taskB.getAssignedTask()); + + replayAndCreateScheduler(); + + offerManager.addOffer(OFFER_A); + changeState(taskA, INIT, PENDING); + timeoutCapture.getValue().run(); + timeoutCapture2.getValue().run(); + + // Ensure the offer was consumed. + changeState(taskB, INIT, PENDING); + timeoutCapture3.getValue().run(); + } + + @Test + public void testDriverNotReady() { + IScheduledTask task = createTask("a", PENDING); + TaskInfo mesosTask = TaskInfo.newBuilder() + .setName(Tasks.id(task)) + .setTaskId(TaskID.newBuilder().setValue(Tasks.id(task))) + .setSlaveId(SlaveID.newBuilder().setValue("slaveId")) + .build(); + + Capture timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); + expectAnyMaintenanceCalls(); + expectOfferDeclineIn(10); + expectNoReservation(SLAVE_A); + expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.success(mesosTask)); + driver.launchTask(OFFER_A.getOffer().getId(), mesosTask); + expectLastCall().andThrow(new IllegalStateException("Driver not ready.")); + expect(stateManager.changeState( + EasyMock.anyObject(), + eq("a"), + eq(Optional.of(PENDING)), + eq(LOST), + eq(TaskSchedulerImpl.LAUNCH_FAILED_MSG))) + .andReturn(StateChangeResult.SUCCESS); + + replayAndCreateScheduler(); + + changeState(task, INIT, PENDING); + offerManager.addOffer(OFFER_A); + timeoutCapture.getValue().run(); + } + + @Test + public void testStorageException() { + IScheduledTask task = createTask("a", PENDING); + TaskInfo mesosTask = TaskInfo.newBuilder() + .setName(Tasks.id(task)) + .setTaskId(TaskID.newBuilder().setValue(Tasks.id(task))) + .setSlaveId(SlaveID.newBuilder().setValue("slaveId")) + .build(); + + Capture timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); + expectAnyMaintenanceCalls(); + expectOfferDeclineIn(10); + expectNoReservation(SLAVE_A).times(2); + expectMaybeAssign(OFFER_A, task, EMPTY).andThrow(new StorageException("Injected failure.")); + + Capture timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10); + expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.success(mesosTask)); + driver.launchTask(OFFER_A.getOffer().getId(), mesosTask); + expectLastCall(); + + replayAndCreateScheduler(); + + changeState(task, INIT, PENDING); + offerManager.addOffer(OFFER_A); + timeoutCapture.getValue().run(); + timeoutCapture2.getValue().run(); + } + + @Test + public void testExpiration() { + IScheduledTask task = createTask("a", PENDING); + + Capture timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); + Capture offerExpirationCapture = expectOfferDeclineIn(10); + expectAnyMaintenanceCalls(); + expectNoReservation(SLAVE_A); + expectReservationCheck(task).times(2); + expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.failure()); + Capture timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10); + expectPreemptorCall(task.getAssignedTask()); + driver.declineOffer(OFFER_A.getOffer().getId()); + expectTaskGroupBackoff(10, 20); + expectPreemptorCall(task.getAssignedTask()); + + replayAndCreateScheduler(); + + changeState(task, INIT, PENDING); + offerManager.addOffer(OFFER_A); + timeoutCapture.getValue().run(); + offerExpirationCapture.getValue().run(); + timeoutCapture2.getValue().run(); + } + + @Test + public void testOneOfferPerSlave() { + expectAnyMaintenanceCalls(); + Capture offerExpirationCapture = expectOfferDeclineIn(10); + + HostOffer offerAB = new HostOffer( + Offers.makeOffer("OFFER_B").toBuilder().setSlaveId(OFFER_A.getOffer().getSlaveId()).build(), + IHostAttributes.build(new HostAttributes())); + + driver.declineOffer(OFFER_A.getOffer().getId()); + driver.declineOffer(offerAB.getOffer().getId()); + + replayAndCreateScheduler(); + + offerManager.addOffer(OFFER_A); + offerManager.addOffer(offerAB); + offerExpirationCapture.getValue().run(); + } + + @Test + public void testDontDeclineAcceptedOffer() throws OfferManager.LaunchException { + expectAnyMaintenanceCalls(); + Capture offerExpirationCapture = expectOfferDeclineIn(10); + + Function offerAcceptor = + createMock(new Clazz>() { }); + final TaskInfo taskInfo = TaskInfo.getDefaultInstance(); + expect(offerAcceptor.apply(OFFER_A)).andReturn(Assignment.success(taskInfo)); + driver.launchTask(OFFER_A.getOffer().getId(), taskInfo); + + replayAndCreateScheduler(); + + offerManager.addOffer(OFFER_A); + offerManager.launchFirst(offerAcceptor, TaskGroupKey.from(ITaskConfig.build(new TaskConfig()))); + offerExpirationCapture.getValue().run(); + } + + @Test + public void testBasicMaintenancePreferences() { + expectOffer(); + expectOffer(); + expectOffer(); + expectOffer(); + + IScheduledTask taskA = createTask("A", PENDING); + TaskInfo mesosTaskA = makeTaskInfo(taskA); + expectNoReservation(SLAVE_A); + expectMaybeAssign(OFFER_A, taskA, EMPTY).andReturn(Assignment.success(mesosTaskA)); + driver.launchTask(OFFER_A.getOffer().getId(), mesosTaskA); + Capture captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); + + IScheduledTask taskB = createTask("B", PENDING); + TaskInfo mesosTaskB = makeTaskInfo(taskB); + expectNoReservation(SLAVE_B); + expectMaybeAssign(OFFER_B, taskB, EMPTY).andReturn(Assignment.success(mesosTaskB)); + driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskB); + Capture captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); + + replayAndCreateScheduler(); + + offerManager.addOffer(OFFER_D); + offerManager.addOffer(OFFER_C); + offerManager.addOffer(OFFER_B); + offerManager.addOffer(OFFER_A); + + changeState(taskA, INIT, PENDING); + captureA.getValue().run(); + + changeState(taskB, INIT, PENDING); + captureB.getValue().run(); + } + + @Test + public void testChangingMaintenancePreferences() { + expectOffer(); + expectOffer(); + expectOffer(); + + IScheduledTask taskA = createTask("A", PENDING); + TaskInfo mesosTaskA = makeTaskInfo(taskA); + expectNoReservation(SLAVE_B); + expectMaybeAssign(OFFER_B, taskA, EMPTY).andReturn(Assignment.success(mesosTaskA)); + driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskA); + Capture captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); + + IScheduledTask taskB = createTask("B", PENDING); + TaskInfo mesosTaskB = makeTaskInfo(taskB); + HostOffer updatedOfferC = new HostOffer( + OFFER_C.getOffer(), + IHostAttributes.build(OFFER_C.getAttributes().newBuilder().setMode(NONE))); + expectNoReservation(SLAVE_C); + expectMaybeAssign(updatedOfferC, taskB, EMPTY).andReturn(Assignment.success(mesosTaskB)); + driver.launchTask(OFFER_C.getOffer().getId(), mesosTaskB); + Capture captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); + + replayAndCreateScheduler(); + + offerManager.addOffer(OFFER_A); + offerManager.addOffer(OFFER_B); + offerManager.addOffer(OFFER_C); + + // Initially, we'd expect the offers to be consumed in order (A, B), with (C) unschedulable + + // Expected order now (B), with (C, A) unschedulable + changeHostMaintenanceState(OFFER_A.getAttributes(), DRAINING); + changeState(taskA, INIT, PENDING); + captureA.getValue().run(); + + // Expected order now (C), with (A) unschedulable and (B) already consumed + changeHostMaintenanceState(OFFER_C.getAttributes(), NONE); + changeState(taskB, INIT, PENDING); + captureB.getValue().run(); + } + + private Capture expectTaskScheduled(IScheduledTask task) { + TaskInfo mesosTask = makeTaskInfo(task); + Capture taskId = createCapture(); + expect(assigner.maybeAssign( + EasyMock.anyObject(), + EasyMock.anyObject(), + EasyMock.anyObject(), + capture(taskId))).andReturn(Assignment.success(mesosTask)); + driver.launchTask(EasyMock.anyObject(), eq(mesosTask)); + return taskId; + } + + @Test + public void testResistsStarvation() { + // TODO(wfarner): This test requires intimate knowledge of the way futures are used inside + // TaskScheduler. It's time to test using a real ScheduledExecutorService. + + expectAnyMaintenanceCalls(); + + IScheduledTask jobA0 = setStatus(makeTask("a0", JobKeys.from("a", "b", "c")), PENDING); + + ScheduledTask jobA1Builder = jobA0.newBuilder(); + jobA1Builder.getAssignedTask().setTaskId("a1"); + jobA1Builder.getAssignedTask().setInstanceId(1); + IScheduledTask jobA1 = IScheduledTask.build(jobA1Builder); + + ScheduledTask jobA2Builder = jobA0.newBuilder(); + jobA2Builder.getAssignedTask().setTaskId("a2"); + jobA2Builder.getAssignedTask().setInstanceId(2); + IScheduledTask jobA2 = IScheduledTask.build(jobA2Builder); + + IScheduledTask jobB0 = setStatus(makeTask("b0", JobKeys.from("d", "e", "f")), PENDING); + + expectNoReservation(SLAVE_A); + expectNoReservation(SLAVE_B); + + expectOfferDeclineIn(10); + expectOfferDeclineIn(10); + expectOfferDeclineIn(10); + expectOfferDeclineIn(10); + + Capture timeoutA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); + Capture timeoutB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); + + Capture firstScheduled = expectTaskScheduled(jobA0); + Capture secondScheduled = expectTaskScheduled(jobB0); + + // Expect another watch of the task group for job A. + expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); + + replayAndCreateScheduler(); + + offerManager.addOffer(OFFER_A); + offerManager.addOffer(OFFER_B); + offerManager.addOffer(OFFER_C); + offerManager.addOffer(OFFER_D); + changeState(jobA0, INIT, PENDING); + changeState(jobA1, INIT, PENDING); + changeState(jobA2, INIT, PENDING); + changeState(jobB0, INIT, PENDING); + timeoutA.getValue().run(); + timeoutB.getValue().run(); + assertEquals( + ImmutableSet.of(Tasks.id(jobA0), Tasks.id(jobB0)), + ImmutableSet.of(firstScheduled.getValue(), secondScheduled.getValue())); + } + + @Test + public void testTaskDeleted() { + expectAnyMaintenanceCalls(); + expectOfferDeclineIn(10); + + final IScheduledTask task = createTask("a", PENDING); + + Capture timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); + expectNoReservation(SLAVE_A); + expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.failure()); + expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 20); + expectReservationCheck(task); + expectPreemptorCall(task.getAssignedTask()); + + replayAndCreateScheduler(); + + offerManager.addOffer(OFFER_A); + changeState(task, INIT, PENDING); + timeoutCapture.getValue().run(); + + // Ensure the offer was consumed. + changeState(task, INIT, PENDING); + storage.write(new MutateWork.NoResult.Quiet() { + @Override + protected void execute(MutableStoreProvider storeProvider) { + storeProvider.getUnsafeTaskStore().deleteTasks(Tasks.ids(task)); + } + }); + taskGroups.tasksDeleted(new TasksDeleted(ImmutableSet.of(task))); + timeoutCapture.getValue().run(); + } + + private TaskInfo makeTaskInfo(IScheduledTask task) { + return TaskInfo.newBuilder() + .setName(Tasks.id(task)) + .setTaskId(TaskID.newBuilder().setValue(Tasks.id(task))) + .setSlaveId(SlaveID.newBuilder().setValue("slave-id" + task.toString())) + .build(); + } + + private void expectAnyMaintenanceCalls() { + expect(maintenance.getMode(isA(String.class))).andReturn(NONE).anyTimes(); + } + + private void changeHostMaintenanceState(IHostAttributes attributes, MaintenanceMode mode) { + offerManager.hostAttributesChanged(new PubsubEvent.HostAttributesChanged( + IHostAttributes.build(attributes.newBuilder().setMode(mode)))); + } + + private static HostOffer makeOffer(String offerId, String hostName, MaintenanceMode mode) { + Offer offer = Offers.makeOffer(offerId, hostName); + return new HostOffer( + offer, + IHostAttributes.build(new HostAttributes() + .setHost(hostName) + .setSlaveId(offer.getSlaveId().getValue()) + .setAttributes(ImmutableSet.of()) + .setMode(mode))); + } + + private void expectPreemptorCall(IAssignedTask task) { + expect(preemptor.attemptPreemptionFor( + eq(task), + eq(EMPTY), + EasyMock.anyObject())).andReturn(Optional.absent()); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java new file mode 100644 index 0000000..4055021 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java @@ -0,0 +1,146 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.scheduling; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.testing.easymock.EasyMockTest; +import com.twitter.common.util.testing.FakeClock; + +import org.apache.aurora.gen.AssignedTask; +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.gen.TaskEvent; +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; +import org.apache.aurora.scheduler.state.StateChangeResult; +import org.apache.aurora.scheduler.state.StateManager; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.easymock.Capture; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.aurora.gen.ScheduleStatus.INIT; +import static org.apache.aurora.gen.ScheduleStatus.PENDING; +import static org.apache.aurora.gen.ScheduleStatus.RUNNING; +import static org.apache.aurora.gen.ScheduleStatus.THROTTLED; +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.expect; + +public class TaskThrottlerTest extends EasyMockTest { + + private RescheduleCalculator rescheduleCalculator; + private FakeClock clock; + private ScheduledExecutorService executor; + private StorageTestUtil storageUtil; + private StateManager stateManager; + private TaskThrottler throttler; + + @Before + public void setUp() throws Exception { + rescheduleCalculator = createMock(RescheduleCalculator.class); + clock = new FakeClock(); + executor = createMock(ScheduledExecutorService.class); + storageUtil = new StorageTestUtil(this); + storageUtil.expectOperations(); + stateManager = createMock(StateManager.class); + throttler = new TaskThrottler( + rescheduleCalculator, + clock, + executor, + storageUtil.storage, + stateManager); + } + + @Test + public void testIgnoresNonThrottledTasks() { + control.replay(); + + throttler.taskChangedState(TaskStateChange.transition(makeTask("a", PENDING), INIT)); + throttler.taskChangedState(TaskStateChange.transition(makeTask("a", RUNNING), PENDING)); + } + + @Test + public void testThrottledTask() { + IScheduledTask task = makeTask("a", THROTTLED); + + long penaltyMs = 100; + + expect(rescheduleCalculator.getFlappingPenaltyMs(task)).andReturn(penaltyMs); + Capture stateChangeCapture = expectThrottled(penaltyMs); + expectMovedToPending(task); + + control.replay(); + + throttler.taskChangedState(TaskStateChange.transition(task, INIT)); + stateChangeCapture.getValue().run(); + } + + @Test + public void testThrottledTaskReady() { + // Ensures that a sane delay is used when the task's penalty was already expired when + // the -> THROTTLED transition occurred (such as in the event of a scheduler failover). + + IScheduledTask task = makeTask("a", THROTTLED); + + long penaltyMs = 100; + + expect(rescheduleCalculator.getFlappingPenaltyMs(task)).andReturn(penaltyMs); + Capture stateChangeCapture = expectThrottled(0); + expectMovedToPending(task); + + control.replay(); + + clock.advance(Amount.of(1L, Time.HOURS)); + throttler.taskChangedState(TaskStateChange.transition(task, INIT)); + stateChangeCapture.getValue().run(); + } + + private Capture expectThrottled(long penaltyMs) { + Capture stateChangeCapture = createCapture(); + expect(executor.schedule( + capture(stateChangeCapture), + eq(penaltyMs), + eq(TimeUnit.MILLISECONDS))) + .andReturn(null); + return stateChangeCapture; + } + + private void expectMovedToPending(IScheduledTask task) { + expect(stateManager.changeState( + storageUtil.mutableStoreProvider, + Tasks.id(task), + Optional.of(THROTTLED), + PENDING, + Optional.absent())) + .andReturn(StateChangeResult.SUCCESS); + } + + private IScheduledTask makeTask(String id, ScheduleStatus status) { + return IScheduledTask.build(new ScheduledTask() + .setTaskEvents(ImmutableList.of( + new TaskEvent() + .setStatus(status) + .setTimestamp(clock.nowMillis()))) + .setStatus(status) + .setAssignedTask(new AssignedTask().setTaskId(id))); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java index dba1945..262f668 100644 --- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java @@ -36,7 +36,6 @@ import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskEvent; import org.apache.aurora.scheduler.TaskIdGenerator; -import org.apache.aurora.scheduler.async.RescheduleCalculator; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.TaskTestUtil; import org.apache.aurora.scheduler.base.Tasks; @@ -45,6 +44,7 @@ import org.apache.aurora.scheduler.events.PubsubEvent; import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted; import org.apache.aurora.scheduler.mesos.Driver; +import org.apache.aurora.scheduler.scheduling.RescheduleCalculator; import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.db.DbUtil; http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java index 8b99e0f..f5e1dd0 100644 --- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java +++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java @@ -58,8 +58,6 @@ import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.scheduler.TaskIdGenerator; import org.apache.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl; -import org.apache.aurora.scheduler.async.RescheduleCalculator; -import org.apache.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl; import org.apache.aurora.scheduler.base.JobKeys; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.TaskTestUtil; @@ -67,6 +65,8 @@ import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.events.PubsubEvent; import org.apache.aurora.scheduler.mesos.Driver; +import org.apache.aurora.scheduler.scheduling.RescheduleCalculator; +import org.apache.aurora.scheduler.scheduling.RescheduleCalculator.RescheduleCalculatorImpl; import org.apache.aurora.scheduler.state.LockManager; import org.apache.aurora.scheduler.state.LockManagerImpl; import org.apache.aurora.scheduler.state.StateChangeResult;