Return-Path: X-Original-To: apmail-aurora-commits-archive@minotaur.apache.org Delivered-To: apmail-aurora-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E9554173D2 for ; Wed, 22 Jul 2015 19:40:14 +0000 (UTC) Received: (qmail 76082 invoked by uid 500); 22 Jul 2015 19:39:59 -0000 Delivered-To: apmail-aurora-commits-archive@aurora.apache.org Received: (qmail 75989 invoked by uid 500); 22 Jul 2015 19:39:59 -0000 Mailing-List: contact commits-help@aurora.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aurora.apache.org Delivered-To: mailing list commits@aurora.apache.org Received: (qmail 75849 invoked by uid 99); 22 Jul 2015 19:39:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Jul 2015 19:39:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C324BE0215; Wed, 22 Jul 2015 19:39:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wfarner@apache.org To: commits@aurora.apache.org Date: Wed, 22 Jul 2015 19:40:02 -0000 Message-Id: In-Reply-To: <76b9bd048c524f7194b93975aab8959e@git.apache.org> References: <76b9bd048c524f7194b93975aab8959e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/8] aurora git commit: Break apart async package and AsyncModule into purpose-specific equivalents. http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java new file mode 100644 index 0000000..1611a3b --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java @@ -0,0 +1,103 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.reconciliation; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Logger; + +import javax.inject.Inject; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterables; +import com.google.common.eventbus.Subscribe; +import com.twitter.common.stats.StatsProvider; +import com.twitter.common.util.BackoffStrategy; + +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; +import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; +import org.apache.aurora.scheduler.mesos.Driver; +import org.apache.aurora.scheduler.storage.Storage; + +import static java.util.Objects.requireNonNull; + +/** + * Watches for task transitions into {@link ScheduleStatus#KILLING KILLING} and periodically + * retries {@link Driver#killTask(String)} until the task transitions. + */ +public class KillRetry implements EventSubscriber { + private static final Logger LOG = Logger.getLogger(KillRetry.class.getName()); + + @VisibleForTesting + static final String RETRIES_COUNTER = "task_kill_retries"; + + private final Driver driver; + private final Storage storage; + private final ScheduledExecutorService executor; + private final BackoffStrategy backoffStrategy; + private final AtomicLong killRetries; + + @Inject + KillRetry( + Driver driver, + Storage storage, + @AsyncExecutor ScheduledExecutorService executor, + BackoffStrategy backoffStrategy, + StatsProvider statsProvider) { + + this.driver = requireNonNull(driver); + this.storage = requireNonNull(storage); + this.executor = requireNonNull(executor); + this.backoffStrategy = requireNonNull(backoffStrategy); + killRetries = statsProvider.makeCounter(RETRIES_COUNTER); + } + + @Subscribe + public void taskChangedState(TaskStateChange stateChange) { + if (stateChange.getNewState() == ScheduleStatus.KILLING) { + new KillAttempt(stateChange.getTaskId()).tryLater(); + } + } + + private class KillAttempt implements Runnable { + private final String taskId; + private final AtomicLong retryInMs = new AtomicLong(); + + KillAttempt(String taskId) { + this.taskId = taskId; + } + + void tryLater() { + retryInMs.set(backoffStrategy.calculateBackoffMs(retryInMs.get())); + executor.schedule(this, retryInMs.get(), TimeUnit.MILLISECONDS); + } + + @Override + public void run() { + Query.Builder query = Query.taskScoped(taskId).byStatus(ScheduleStatus.KILLING); + if (!Iterables.isEmpty(Storage.Util.fetchTasks(storage, query))) { + LOG.info("Task " + taskId + " not yet killed, retrying."); + + // Kill did not yet take effect, try again. + driver.killTask(taskId); + killRetries.incrementAndGet(); + tryLater(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java new file mode 100644 index 0000000..406c077 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java @@ -0,0 +1,118 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.reconciliation; + +import javax.inject.Singleton; + +import com.google.inject.AbstractModule; +import com.google.inject.PrivateModule; +import com.google.inject.TypeLiteral; +import com.twitter.common.args.Arg; +import com.twitter.common.args.CmdLine; +import com.twitter.common.args.constraints.Positive; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.util.BackoffStrategy; +import com.twitter.common.util.TruncatedBinaryBackoff; + +import org.apache.aurora.scheduler.SchedulerServicesModule; +import org.apache.aurora.scheduler.events.PubsubEventModule; +import org.apache.aurora.scheduler.reconciliation.TaskReconciler.TaskReconcilerSettings; + +/** + * Binding module for state reconciliation and retry logic. + */ +public class ReconciliationModule extends AbstractModule { + + @CmdLine(name = "transient_task_state_timeout", + help = "The amount of time after which to treat a task stuck in a transient state as LOST.") + private static final Arg> TRANSIENT_TASK_STATE_TIMEOUT = + Arg.create(Amount.of(5L, Time.MINUTES)); + + @CmdLine(name = "initial_task_kill_retry_interval", + help = "When killing a task, retry after this delay if mesos has not responded," + + " backing off up to transient_task_state_timeout") + private static final Arg> INITIAL_TASK_KILL_RETRY_INTERVAL = + Arg.create(Amount.of(5L, Time.SECONDS)); + + // Reconciliation may create a big surge of status updates in a large cluster. Setting the default + // initial delay to 1 minute to ease up storage contention during scheduler start up. + @CmdLine(name = "reconciliation_initial_delay", + help = "Initial amount of time to delay task reconciliation after scheduler start up.") + private static final Arg> RECONCILIATION_INITIAL_DELAY = + Arg.create(Amount.of(1L, Time.MINUTES)); + + @Positive + @CmdLine(name = "reconciliation_explicit_interval", + help = "Interval on which scheduler will ask Mesos for status updates of all non-terminal " + + "tasks known to scheduler.") + private static final Arg> RECONCILIATION_EXPLICIT_INTERVAL = + Arg.create(Amount.of(60L, Time.MINUTES)); + + @Positive + @CmdLine(name = "reconciliation_implicit_interval", + help = "Interval on which scheduler will ask Mesos for status updates of all non-terminal " + + "tasks known to Mesos.") + private static final Arg> RECONCILIATION_IMPLICIT_INTERVAL = + Arg.create(Amount.of(60L, Time.MINUTES)); + + @CmdLine(name = "reconciliation_schedule_spread", + help = "Difference between explicit and implicit reconciliation intervals intended to " + + "create a non-overlapping task reconciliation schedule.") + private static final Arg> RECONCILIATION_SCHEDULE_SPREAD = + Arg.create(Amount.of(30L, Time.MINUTES)); + + @Override + protected void configure() { + install(new PrivateModule() { + @Override + protected void configure() { + bind(new TypeLiteral>() { }) + .toInstance(TRANSIENT_TASK_STATE_TIMEOUT.get()); + + bind(TaskTimeout.class).in(Singleton.class); + expose(TaskTimeout.class); + } + }); + PubsubEventModule.bindSubscriber(binder(), TaskTimeout.class); + SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(TaskTimeout.class); + + install(new PrivateModule() { + @Override + protected void configure() { + bind(BackoffStrategy.class).toInstance( + new TruncatedBinaryBackoff( + INITIAL_TASK_KILL_RETRY_INTERVAL.get(), + TRANSIENT_TASK_STATE_TIMEOUT.get())); + bind(KillRetry.class).in(Singleton.class); + expose(KillRetry.class); + } + }); + PubsubEventModule.bindSubscriber(binder(), KillRetry.class); + + install(new PrivateModule() { + @Override + protected void configure() { + bind(TaskReconcilerSettings.class).toInstance(new TaskReconcilerSettings( + RECONCILIATION_INITIAL_DELAY.get(), + RECONCILIATION_EXPLICIT_INTERVAL.get(), + RECONCILIATION_IMPLICIT_INTERVAL.get(), + RECONCILIATION_SCHEDULE_SPREAD.get())); + bind(TaskReconciler.class).in(Singleton.class); + expose(TaskReconciler.class); + } + }); + SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(TaskReconciler.class); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java new file mode 100644 index 0000000..653e52b --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java @@ -0,0 +1,156 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.reconciliation; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicLong; + +import javax.inject.Inject; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.AbstractIdleService; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.stats.StatsProvider; + +import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.mesos.Driver; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.mesos.Protos; + +import static java.util.Objects.requireNonNull; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.twitter.common.quantity.Time.MINUTES; + +/** + * A task reconciler that periodically triggers Mesos (implicit) and Aurora (explicit) task + * reconciliation to synchronize global task states. More on task reconciliation: + * http://mesos.apache.org/documentation/latest/reconciliation. + */ +public class TaskReconciler extends AbstractIdleService { + + @VisibleForTesting + static final String EXPLICIT_STAT_NAME = "reconciliation_explicit_runs"; + + @VisibleForTesting + static final String IMPLICIT_STAT_NAME = "reconciliation_implicit_runs"; + + private final TaskReconcilerSettings settings; + private final Storage storage; + private final Driver driver; + private final ScheduledExecutorService executor; + private final AtomicLong explicitRuns; + private final AtomicLong implicitRuns; + + static class TaskReconcilerSettings { + private final Amount explicitInterval; + private final Amount implicitInterval; + private final long explicitDelayMinutes; + private final long implicitDelayMinutes; + + @VisibleForTesting + TaskReconcilerSettings( + Amount initialDelay, + Amount explicitInterval, + Amount implicitInterval, + Amount scheduleSpread) { + + this.explicitInterval = requireNonNull(explicitInterval); + this.implicitInterval = requireNonNull(implicitInterval); + explicitDelayMinutes = requireNonNull(initialDelay).as(MINUTES); + implicitDelayMinutes = initialDelay.as(MINUTES) + scheduleSpread.as(MINUTES); + checkArgument( + explicitDelayMinutes >= 0, + "Invalid explicit reconciliation delay: " + explicitDelayMinutes); + checkArgument( + implicitDelayMinutes >= 0L, + "Invalid implicit reconciliation delay: " + implicitDelayMinutes); + } + } + + @Inject + TaskReconciler( + TaskReconcilerSettings settings, + Storage storage, + Driver driver, + @AsyncExecutor ScheduledExecutorService executor, + StatsProvider stats) { + + this.settings = requireNonNull(settings); + this.storage = requireNonNull(storage); + this.driver = requireNonNull(driver); + this.executor = requireNonNull(executor); + this.explicitRuns = stats.makeCounter(EXPLICIT_STAT_NAME); + this.implicitRuns = stats.makeCounter(IMPLICIT_STAT_NAME); + } + + @Override + protected void startUp() { + // Schedule explicit reconciliation. + executor.scheduleAtFixedRate( + new Runnable() { + @Override + public void run() { + ImmutableSet active = FluentIterable + .from(Storage.Util.fetchTasks( + storage, + Query.unscoped().byStatus(Tasks.SLAVE_ASSIGNED_STATES))) + .transform(TASK_TO_PROTO) + .toSet(); + + driver.reconcileTasks(active); + explicitRuns.incrementAndGet(); + } + }, + settings.explicitDelayMinutes, + settings.explicitInterval.as(MINUTES), + MINUTES.getTimeUnit()); + + // Schedule implicit reconciliation. + executor.scheduleAtFixedRate( + new Runnable() { + @Override + public void run() { + driver.reconcileTasks(ImmutableSet.of()); + implicitRuns.incrementAndGet(); + } + }, + settings.implicitDelayMinutes, + settings.implicitInterval.as(MINUTES), + MINUTES.getTimeUnit()); + } + + @Override + protected void shutDown() { + // Nothing to do - await VM shutdown. + } + + @VisibleForTesting + static final Function TASK_TO_PROTO = + t -> Protos.TaskStatus.newBuilder() + // TODO(maxim): State is required by protobuf but ignored by Mesos for reconciliation + // purposes. This is the artifact of the native API. The new HTTP Mesos API will be + // accepting task IDs instead. AURORA-1326 tracks solution on the scheduler side. + // Setting TASK_RUNNING as a safe dummy value here. + .setState(Protos.TaskState.TASK_RUNNING) + .setTaskId(Protos.TaskID.newBuilder().setValue(t.getAssignedTask().getTaskId()).build()) + .build(); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java new file mode 100644 index 0000000..fb83972 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java @@ -0,0 +1,158 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.reconciliation; + +import java.util.EnumSet; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Logger; + +import javax.inject.Inject; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.eventbus.Subscribe; +import com.google.common.util.concurrent.AbstractIdleService; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.stats.StatsProvider; + +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; +import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; +import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; +import org.apache.aurora.scheduler.state.StateChangeResult; +import org.apache.aurora.scheduler.state.StateManager; +import org.apache.aurora.scheduler.storage.Storage; + +import static java.util.Objects.requireNonNull; + +import static org.apache.aurora.scheduler.storage.Storage.MutateWork; + +/** + * Observes task transitions and identifies tasks that are 'stuck' in a transient state. Stuck + * tasks will be transitioned to the LOST state. + */ +class TaskTimeout extends AbstractIdleService implements EventSubscriber { + private static final Logger LOG = Logger.getLogger(TaskTimeout.class.getName()); + + @VisibleForTesting + static final Amount NOT_STARTED_RETRY = Amount.of(5L, Time.SECONDS); + + @VisibleForTesting + static final String TIMED_OUT_TASKS_COUNTER = "timed_out_tasks"; + + @VisibleForTesting + static final Optional TIMEOUT_MESSAGE = Optional.of("Task timed out"); + + @VisibleForTesting + static final Set TRANSIENT_STATES = EnumSet.of( + ScheduleStatus.ASSIGNED, + ScheduleStatus.PREEMPTING, + ScheduleStatus.RESTARTING, + ScheduleStatus.KILLING, + ScheduleStatus.DRAINING); + + private final ScheduledExecutorService executor; + private final Storage storage; + private final StateManager stateManager; + private final Amount timeout; + private final AtomicLong timedOutTasks; + + @Inject + TaskTimeout( + @AsyncExecutor ScheduledExecutorService executor, + Storage storage, + StateManager stateManager, + Amount timeout, + StatsProvider statsProvider) { + + this.executor = requireNonNull(executor); + this.storage = requireNonNull(storage); + this.stateManager = requireNonNull(stateManager); + this.timeout = requireNonNull(timeout); + this.timedOutTasks = statsProvider.makeCounter(TIMED_OUT_TASKS_COUNTER); + } + + private static boolean isTransient(ScheduleStatus status) { + return TRANSIENT_STATES.contains(status); + } + + @Override + protected void startUp() { + // No work to do here for startup, however we leverage the state tracking in + // AbstractIdleService. + } + + @Override + protected void shutDown() { + // Nothing to do for shutting down. + } + + private class TimedOutTaskHandler implements Runnable { + private final String taskId; + private final ScheduleStatus newState; + + TimedOutTaskHandler(String taskId, ScheduleStatus newState) { + this.taskId = taskId; + this.newState = newState; + } + + @Override + public void run() { + if (isRunning()) { + // This query acts as a CAS by including the state that we expect the task to be in + // if the timeout is still valid. Ideally, the future would have already been + // canceled, but in the event of a state transition race, including transientState + // prevents an unintended task timeout. + // Note: This requires LOST transitions trigger Driver.killTask. + StateChangeResult result = storage.write(new MutateWork.Quiet() { + @Override + public StateChangeResult apply(Storage.MutableStoreProvider storeProvider) { + return stateManager.changeState( + storeProvider, + taskId, + Optional.of(newState), + ScheduleStatus.LOST, + TIMEOUT_MESSAGE); + } + }); + + if (result == StateChangeResult.SUCCESS) { + LOG.info("Timeout reached for task " + taskId + ":" + taskId); + timedOutTasks.incrementAndGet(); + } + } else { + // Our service is not yet started. We don't want to lose track of the task, so + // we will try again later. + LOG.fine("Retrying timeout of task " + taskId + " in " + NOT_STARTED_RETRY); + executor.schedule( + this, + NOT_STARTED_RETRY.getValue(), + NOT_STARTED_RETRY.getUnit().getTimeUnit()); + } + } + } + + @Subscribe + public void recordStateChange(TaskStateChange change) { + if (isTransient(change.getNewState())) { + executor.schedule( + new TimedOutTaskHandler(change.getTaskId(), change.getNewState()), + timeout.getValue(), + timeout.getUnit().getTimeUnit()); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculator.java b/src/main/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculator.java new file mode 100644 index 0000000..bfc23cd --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculator.java @@ -0,0 +1,174 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.scheduling; + +import java.util.EnumSet; +import java.util.List; +import java.util.Set; +import java.util.logging.Logger; + +import javax.inject.Inject; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.util.BackoffStrategy; +import com.twitter.common.util.Random; + +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.entities.ITaskEvent; + +import static java.util.Objects.requireNonNull; + +import static org.apache.aurora.gen.ScheduleStatus.DRAINING; +import static org.apache.aurora.gen.ScheduleStatus.KILLING; +import static org.apache.aurora.gen.ScheduleStatus.RESTARTING; + +/** + * Calculates scheduling delays for tasks. + */ +public interface RescheduleCalculator { + /** + * Calculates the delay, in milliseconds, before the task should be considered eligible for + * (re)scheduling at scheduler startup. + * + * @param task Task to calculate delay for. + * @return Delay in msec. + */ + long getStartupScheduleDelayMs(IScheduledTask task); + + /** + * Calculates the penalty, in milliseconds, that a task should be penalized before being + * eligible for rescheduling. + * + * @param task Task to calculate delay for. + * @return Delay in msec. + */ + long getFlappingPenaltyMs(IScheduledTask task); + + class RescheduleCalculatorImpl implements RescheduleCalculator { + + private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName()); + + private final Storage storage; + private final RescheduleCalculatorSettings settings; + // TODO(wfarner): Inject 'random' in the constructor for better test coverage. + private final Random random = new Random.SystemRandom(new java.util.Random()); + + private static final Predicate IS_ACTIVE_STATUS = + Predicates.in(Tasks.ACTIVE_STATES); + + private static final Set INTERRUPTED_TASK_STATES = + EnumSet.of(RESTARTING, KILLING, DRAINING); + + private final Predicate flapped = new Predicate() { + @Override + public boolean apply(IScheduledTask task) { + if (!task.isSetTaskEvents()) { + return false; + } + + List events = Lists.reverse(task.getTaskEvents()); + + // Avoid penalizing tasks that were interrupted by outside action, such as a user + // restarting them. + if (Iterables.any(Iterables.transform(events, Tasks.TASK_EVENT_TO_STATUS), + Predicates.in(INTERRUPTED_TASK_STATES))) { + return false; + } + + ITaskEvent terminalEvent = Iterables.get(events, 0); + ScheduleStatus terminalState = terminalEvent.getStatus(); + Preconditions.checkState(Tasks.isTerminated(terminalState)); + + ITaskEvent activeEvent = Iterables.find( + events, + Predicates.compose(IS_ACTIVE_STATUS, Tasks.TASK_EVENT_TO_STATUS)); + + long thresholdMs = settings.flappingTaskThreashold.as(Time.MILLISECONDS); + + return (terminalEvent.getTimestamp() - activeEvent.getTimestamp()) < thresholdMs; + } + }; + + @VisibleForTesting + public static class RescheduleCalculatorSettings { + private final BackoffStrategy flappingTaskBackoff; + private final Amount flappingTaskThreashold; + private final Amount maxStartupRescheduleDelay; + + public RescheduleCalculatorSettings( + BackoffStrategy flappingTaskBackoff, + Amount flappingTaskThreashold, + Amount maxStartupRescheduleDelay) { + + this.flappingTaskBackoff = requireNonNull(flappingTaskBackoff); + this.flappingTaskThreashold = requireNonNull(flappingTaskThreashold); + this.maxStartupRescheduleDelay = requireNonNull(maxStartupRescheduleDelay); + } + } + + @Inject + RescheduleCalculatorImpl(Storage storage, RescheduleCalculatorSettings settings) { + this.storage = requireNonNull(storage); + this.settings = requireNonNull(settings); + } + + @Override + public long getStartupScheduleDelayMs(IScheduledTask task) { + return random.nextInt(settings.maxStartupRescheduleDelay.as(Time.MILLISECONDS).intValue()) + + getFlappingPenaltyMs(task); + } + + private Optional getTaskAncestor(IScheduledTask task) { + if (!task.isSetAncestorId()) { + return Optional.absent(); + } + + Iterable res = + Storage.Util.fetchTasks(storage, Query.taskScoped(task.getAncestorId())); + + return Optional.fromNullable(Iterables.getOnlyElement(res, null)); + } + + @Override + public long getFlappingPenaltyMs(IScheduledTask task) { + Optional curTask = getTaskAncestor(task); + long penaltyMs = 0; + while (curTask.isPresent() && flapped.apply(curTask.get())) { + LOG.info( + String.format("Ancestor of %s flapped: %s", Tasks.id(task), Tasks.id(curTask.get()))); + long newPenalty = settings.flappingTaskBackoff.calculateBackoffMs(penaltyMs); + // If the backoff strategy is truncated then there is no need for us to continue. + if (newPenalty == penaltyMs) { + break; + } + penaltyMs = newPenalty; + curTask = getTaskAncestor(curTask.get()); + } + + return penaltyMs; + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java new file mode 100644 index 0000000..c7a1a46 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java @@ -0,0 +1,134 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.scheduling; + +import javax.inject.Singleton; + +import com.google.common.util.concurrent.RateLimiter; +import com.google.inject.AbstractModule; +import com.google.inject.PrivateModule; +import com.google.inject.TypeLiteral; +import com.twitter.common.args.Arg; +import com.twitter.common.args.CmdLine; +import com.twitter.common.args.constraints.Positive; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.util.TruncatedBinaryBackoff; + +import org.apache.aurora.scheduler.base.TaskGroupKey; +import org.apache.aurora.scheduler.events.PubsubEventModule; +import org.apache.aurora.scheduler.preemptor.BiCache; +import org.apache.aurora.scheduler.scheduling.RescheduleCalculator.RescheduleCalculatorImpl; + +/** + * Binding module for task scheduling logic. + */ +public class SchedulingModule extends AbstractModule { + + @CmdLine(name = "max_schedule_attempts_per_sec", + help = "Maximum number of scheduling attempts to make per second.") + private static final Arg MAX_SCHEDULE_ATTEMPTS_PER_SEC = Arg.create(40D); + + @CmdLine(name = "flapping_task_threshold", + help = "A task that repeatedly runs for less than this time is considered to be flapping.") + private static final Arg> FLAPPING_THRESHOLD = + Arg.create(Amount.of(5L, Time.MINUTES)); + + @CmdLine(name = "initial_flapping_task_delay", + help = "Initial amount of time to wait before attempting to schedule a flapping task.") + private static final Arg> INITIAL_FLAPPING_DELAY = + Arg.create(Amount.of(30L, Time.SECONDS)); + + @CmdLine(name = "max_flapping_task_delay", + help = "Maximum delay between attempts to schedule a flapping task.") + private static final Arg> MAX_FLAPPING_DELAY = + Arg.create(Amount.of(5L, Time.MINUTES)); + + @CmdLine(name = "max_reschedule_task_delay_on_startup", + help = "Upper bound of random delay for pending task rescheduling on scheduler startup.") + private static final Arg> MAX_RESCHEDULING_DELAY = + Arg.create(Amount.of(30, Time.SECONDS)); + + @Positive + @CmdLine(name = "first_schedule_delay", + help = "Initial amount of time to wait before first attempting to schedule a PENDING task.") + private static final Arg> FIRST_SCHEDULE_DELAY = + Arg.create(Amount.of(1L, Time.MILLISECONDS)); + + @Positive + @CmdLine(name = "initial_schedule_penalty", + help = "Initial amount of time to wait before attempting to schedule a task that has failed" + + " to schedule.") + private static final Arg> INITIAL_SCHEDULE_PENALTY = + Arg.create(Amount.of(1L, Time.SECONDS)); + + @CmdLine(name = "max_schedule_penalty", + help = "Maximum delay between attempts to schedule a PENDING tasks.") + private static final Arg> MAX_SCHEDULE_PENALTY = + Arg.create(Amount.of(1L, Time.MINUTES)); + + @CmdLine(name = "offer_reservation_duration", help = "Time to reserve a slave's offers while " + + "trying to satisfy a task preempting another.") + private static final Arg> RESERVATION_DURATION = + Arg.create(Amount.of(3L, Time.MINUTES)); + + @Override + protected void configure() { + install(new PrivateModule() { + @Override + protected void configure() { + bind(TaskGroups.TaskGroupsSettings.class).toInstance(new TaskGroups.TaskGroupsSettings( + FIRST_SCHEDULE_DELAY.get(), + new TruncatedBinaryBackoff( + INITIAL_SCHEDULE_PENALTY.get(), + MAX_SCHEDULE_PENALTY.get()), + RateLimiter.create(MAX_SCHEDULE_ATTEMPTS_PER_SEC.get()))); + + bind(RescheduleCalculatorImpl.RescheduleCalculatorSettings.class) + .toInstance(new RescheduleCalculatorImpl.RescheduleCalculatorSettings( + new TruncatedBinaryBackoff(INITIAL_FLAPPING_DELAY.get(), MAX_FLAPPING_DELAY.get()), + FLAPPING_THRESHOLD.get(), + MAX_RESCHEDULING_DELAY.get())); + + bind(RescheduleCalculator.class).to(RescheduleCalculatorImpl.class).in(Singleton.class); + expose(RescheduleCalculator.class); + bind(TaskGroups.class).in(Singleton.class); + expose(TaskGroups.class); + } + }); + PubsubEventModule.bindSubscriber(binder(), TaskGroups.class); + + install(new PrivateModule() { + @Override + protected void configure() { + bind(new TypeLiteral>() { }).in(Singleton.class); + bind(BiCache.BiCacheSettings.class).toInstance( + new BiCache.BiCacheSettings(RESERVATION_DURATION.get(), "reservation_cache_size")); + bind(TaskScheduler.class).to(TaskScheduler.TaskSchedulerImpl.class); + bind(TaskScheduler.TaskSchedulerImpl.class).in(Singleton.class); + expose(TaskScheduler.class); + } + }); + PubsubEventModule.bindSubscriber(binder(), TaskScheduler.class); + + install(new PrivateModule() { + @Override + protected void configure() { + bind(TaskThrottler.class).in(Singleton.class); + expose(TaskThrottler.class); + } + }); + PubsubEventModule.bindSubscriber(binder(), TaskThrottler.class); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroup.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroup.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroup.java new file mode 100644 index 0000000..5d31955 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroup.java @@ -0,0 +1,77 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.scheduling; + +import java.util.Queue; +import java.util.Set; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; + +import org.apache.aurora.scheduler.base.TaskGroupKey; + +/** + * A group of task IDs that are eligible for scheduling, but may be waiting for a backoff to expire. + */ +class TaskGroup { + private final TaskGroupKey key; + private long penaltyMs; + private final Queue tasks; + + TaskGroup(TaskGroupKey key, String initialTaskId) { + this.key = key; + this.penaltyMs = 0; + this.tasks = Lists.newLinkedList(); + this.tasks.add(initialTaskId); + } + + synchronized TaskGroupKey getKey() { + return key; + } + + synchronized Optional peek() { + return Optional.fromNullable(tasks.peek()); + } + + synchronized boolean hasMore() { + return !tasks.isEmpty(); + } + + synchronized void remove(String taskId) { + tasks.remove(taskId); + } + + synchronized void offer(String taskId) { + tasks.offer(taskId); + } + + synchronized void setPenaltyMs(long penaltyMs) { + this.penaltyMs = penaltyMs; + } + + // Begin methods used for debug interfaces. + + public synchronized String getName() { + return key.toString(); + } + + public synchronized Set getTaskIds() { + return ImmutableSet.copyOf(tasks); + } + + public synchronized long getPenaltyMs() { + return penaltyMs; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java new file mode 100644 index 0000000..3f262bf --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java @@ -0,0 +1,239 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.scheduling; + +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.logging.Logger; + +import javax.inject.Inject; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.eventbus.Subscribe; +import com.google.common.util.concurrent.RateLimiter; +import com.twitter.common.application.ShutdownRegistry; +import com.twitter.common.base.Command; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.stats.SlidingStats; +import com.twitter.common.stats.Stats; +import com.twitter.common.util.BackoffStrategy; +import com.twitter.common.util.concurrent.ExecutorServiceShutdown; + +import org.apache.aurora.scheduler.base.AsyncUtil; +import org.apache.aurora.scheduler.base.TaskGroupKey; +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; +import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; +import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted; +import org.apache.aurora.scheduler.storage.entities.IAssignedTask; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; + +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import static org.apache.aurora.gen.ScheduleStatus.PENDING; + +/** + * A collection of task groups, where a task group is a collection of tasks that are known to be + * equal in the way they schedule. This is expected to be tasks associated with the same job key, + * who also have {@code equal()} {@link org.apache.aurora.scheduler.storage.entities.ITaskConfig} + * values. + *

+ * This is used to prevent redundant work in trying to schedule tasks as well as to provide + * nearly-equal responsiveness when scheduling across jobs. In other words, a 1000 instance job + * cannot starve a 1 instance job. + */ +public class TaskGroups implements EventSubscriber { + + private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName()); + + private final ConcurrentMap groups = Maps.newConcurrentMap(); + private final ScheduledExecutorService executor; + private final TaskScheduler taskScheduler; + private final long firstScheduleDelay; + private final BackoffStrategy backoff; + private final RescheduleCalculator rescheduleCalculator; + + // Track the penalties of tasks at the time they were scheduled. This is to provide data that + // may influence the selection of a different backoff strategy. + private final SlidingStats scheduledTaskPenalties = + new SlidingStats("scheduled_task_penalty", "ms"); + + public static class TaskGroupsSettings { + private final Amount firstScheduleDelay; + private final BackoffStrategy taskGroupBackoff; + private final RateLimiter rateLimiter; + + public TaskGroupsSettings( + Amount firstScheduleDelay, + BackoffStrategy taskGroupBackoff, + RateLimiter rateLimiter) { + + this.firstScheduleDelay = requireNonNull(firstScheduleDelay); + this.taskGroupBackoff = requireNonNull(taskGroupBackoff); + this.rateLimiter = requireNonNull(rateLimiter); + } + } + + @Inject + TaskGroups( + ShutdownRegistry shutdownRegistry, + TaskGroupsSettings settings, + TaskScheduler taskScheduler, + RescheduleCalculator rescheduleCalculator) { + + this( + createThreadPool(shutdownRegistry), + settings.firstScheduleDelay, + settings.taskGroupBackoff, + settings.rateLimiter, + taskScheduler, + rescheduleCalculator); + } + + @VisibleForTesting + TaskGroups( + final ScheduledExecutorService executor, + final Amount firstScheduleDelay, + final BackoffStrategy backoff, + final RateLimiter rateLimiter, + final TaskScheduler taskScheduler, + final RescheduleCalculator rescheduleCalculator) { + + requireNonNull(firstScheduleDelay); + Preconditions.checkArgument(firstScheduleDelay.getValue() > 0); + + this.executor = requireNonNull(executor); + requireNonNull(rateLimiter); + requireNonNull(taskScheduler); + this.firstScheduleDelay = firstScheduleDelay.as(Time.MILLISECONDS); + this.backoff = requireNonNull(backoff); + this.rescheduleCalculator = requireNonNull(rescheduleCalculator); + + this.taskScheduler = new TaskScheduler() { + @Override + public boolean schedule(String taskId) { + rateLimiter.acquire(); + return taskScheduler.schedule(taskId); + } + }; + } + + private synchronized void evaluateGroupLater(Runnable evaluate, TaskGroup group) { + // Avoid check-then-act by holding the intrinsic lock. If not done atomically, we could + // remove a group while a task is being added to it. + if (group.hasMore()) { + executor.schedule(evaluate, group.getPenaltyMs(), MILLISECONDS); + } else { + groups.remove(group.getKey()); + } + } + + private void startGroup(final TaskGroup group) { + Runnable monitor = new Runnable() { + @Override + public void run() { + Optional taskId = group.peek(); + long penaltyMs = 0; + if (taskId.isPresent()) { + if (taskScheduler.schedule(taskId.get())) { + scheduledTaskPenalties.accumulate(group.getPenaltyMs()); + group.remove(taskId.get()); + if (group.hasMore()) { + penaltyMs = firstScheduleDelay; + } + } else { + penaltyMs = backoff.calculateBackoffMs(group.getPenaltyMs()); + } + } + + group.setPenaltyMs(penaltyMs); + evaluateGroupLater(this, group); + } + }; + evaluateGroupLater(monitor, group); + } + + private static ScheduledExecutorService createThreadPool(ShutdownRegistry shutdownRegistry) { + final ScheduledThreadPoolExecutor executor = + AsyncUtil.singleThreadLoggingScheduledExecutor("TaskScheduler-%d", LOG); + + Stats.exportSize("schedule_queue_size", executor.getQueue()); + shutdownRegistry.addAction(new Command() { + @Override + public void execute() { + new ExecutorServiceShutdown(executor, Amount.of(1L, Time.SECONDS)).execute(); + } + }); + return executor; + } + + /** + * Informs the task groups of a task state change. + *

+ * This is used to observe {@link org.apache.aurora.gen.ScheduleStatus#PENDING} tasks and begin + * attempting to schedule them. + * + * @param stateChange State change notification. + */ + @Subscribe + public synchronized void taskChangedState(TaskStateChange stateChange) { + if (stateChange.getNewState() == PENDING) { + IScheduledTask task = stateChange.getTask(); + TaskGroupKey key = TaskGroupKey.from(task.getAssignedTask().getTask()); + TaskGroup newGroup = new TaskGroup(key, Tasks.id(task)); + TaskGroup existing = groups.putIfAbsent(key, newGroup); + if (existing == null) { + long penaltyMs; + if (stateChange.isTransition()) { + penaltyMs = firstScheduleDelay; + } else { + penaltyMs = rescheduleCalculator.getStartupScheduleDelayMs(task); + } + newGroup.setPenaltyMs(penaltyMs); + startGroup(newGroup); + } else { + existing.offer(Tasks.id(task)); + } + } + } + + /** + * Signals the scheduler that tasks have been deleted. + * + * @param deleted Tasks deleted event. + */ + @Subscribe + public synchronized void tasksDeleted(TasksDeleted deleted) { + for (IAssignedTask task + : Iterables.transform(deleted.getTasks(), Tasks.SCHEDULED_TO_ASSIGNED)) { + TaskGroup group = groups.get(TaskGroupKey.from(task.getTask())); + if (group != null) { + group.remove(task.getTaskId()); + } + } + } + + public Iterable getGroups() { + return ImmutableSet.copyOf(groups.values()); + } + +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java new file mode 100644 index 0000000..04e5063 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java @@ -0,0 +1,248 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.scheduling; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.inject.Inject; +import javax.inject.Qualifier; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.collect.Iterables; +import com.google.common.eventbus.Subscribe; +import com.twitter.common.inject.TimedInterceptor.Timed; +import com.twitter.common.stats.Stats; + +import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.base.TaskGroupKey; +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; +import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; +import org.apache.aurora.scheduler.filter.AttributeAggregate; +import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; +import org.apache.aurora.scheduler.offers.OfferManager; +import org.apache.aurora.scheduler.preemptor.BiCache; +import org.apache.aurora.scheduler.preemptor.Preemptor; +import org.apache.aurora.scheduler.state.StateManager; +import org.apache.aurora.scheduler.state.TaskAssigner; +import org.apache.aurora.scheduler.state.TaskAssigner.Assignment; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; +import org.apache.aurora.scheduler.storage.Storage.MutateWork; +import org.apache.aurora.scheduler.storage.entities.IAssignedTask; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; +import static java.util.Objects.requireNonNull; + +import static org.apache.aurora.gen.ScheduleStatus.LOST; +import static org.apache.aurora.gen.ScheduleStatus.PENDING; + +/** + * Enables scheduling and preemption of tasks. + */ +public interface TaskScheduler extends EventSubscriber { + + /** + * Attempts to schedule a task, possibly performing irreversible actions. + * + * @param taskId The task to attempt to schedule. + * @return {@code true} if the task was scheduled, {@code false} otherwise. The caller should + * call schedule again if {@code false} is returned. + */ + boolean schedule(String taskId); + + /** + * An asynchronous task scheduler. Scheduling of tasks is performed on a delay, where each task + * backs off after a failed scheduling attempt. + *

+ * Pending tasks are advertised to the scheduler via internal pubsub notifications. + */ + class TaskSchedulerImpl implements TaskScheduler { + /** + * Binding annotation for the time duration of reservations. + */ + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + public @interface ReservationDuration { } + + private static final Logger LOG = Logger.getLogger(TaskSchedulerImpl.class.getName()); + + private final Storage storage; + private final StateManager stateManager; + private final TaskAssigner assigner; + private final OfferManager offerManager; + private final Preemptor preemptor; + private final BiCache reservations; + + private final AtomicLong attemptsFired = Stats.exportLong("schedule_attempts_fired"); + private final AtomicLong attemptsFailed = Stats.exportLong("schedule_attempts_failed"); + private final AtomicLong attemptsNoMatch = Stats.exportLong("schedule_attempts_no_match"); + + @Inject + TaskSchedulerImpl( + Storage storage, + StateManager stateManager, + TaskAssigner assigner, + OfferManager offerManager, + Preemptor preemptor, + BiCache reservations) { + + this.storage = requireNonNull(storage); + this.stateManager = requireNonNull(stateManager); + this.assigner = requireNonNull(assigner); + this.offerManager = requireNonNull(offerManager); + this.preemptor = requireNonNull(preemptor); + this.reservations = requireNonNull(reservations); + } + + private Function getAssignerFunction( + final MutableStoreProvider storeProvider, + final ResourceRequest resourceRequest, + final String taskId) { + + // TODO(wfarner): Turn this into Predicate, and in the caller, find the first match + // and perform the assignment at the very end. This will allow us to use optimistic locking + // at the top of the stack and avoid holding the write lock for too long. + return new Function() { + @Override + public Assignment apply(HostOffer offer) { + Optional reservation = + reservations.get(offer.getOffer().getSlaveId().getValue()); + + if (reservation.isPresent()) { + if (TaskGroupKey.from(resourceRequest.getTask()).equals(reservation.get())) { + // Slave is reserved to satisfy this task group. + return assigner.maybeAssign(storeProvider, offer, resourceRequest, taskId); + } else { + // Slave is reserved for another task. + return Assignment.failure(); + } + } else { + // Slave is not reserved. + return assigner.maybeAssign(storeProvider, offer, resourceRequest, taskId); + } + } + }; + } + + @VisibleForTesting + static final Optional LAUNCH_FAILED_MSG = + Optional.of("Unknown exception attempting to schedule task."); + + @Timed("task_schedule_attempt") + @Override + public boolean schedule(final String taskId) { + attemptsFired.incrementAndGet(); + try { + return storage.write(new MutateWork.Quiet() { + @Override + public Boolean apply(MutableStoreProvider store) { + return scheduleTask(store, taskId); + } + }); + } catch (RuntimeException e) { + // We catch the generic unchecked exception here to ensure tasks are not abandoned + // if there is a transient issue resulting in an unchecked exception. + LOG.log(Level.WARNING, "Task scheduling unexpectedly failed, will be retried", e); + attemptsFailed.incrementAndGet(); + return false; + } + } + + @Timed("task_schedule_attempt_locked") + protected boolean scheduleTask(MutableStoreProvider store, String taskId) { + LOG.fine("Attempting to schedule task " + taskId); + IAssignedTask assignedTask = Iterables.getOnlyElement( + Iterables.transform( + store.getTaskStore().fetchTasks(Query.taskScoped(taskId).byStatus(PENDING)), + Tasks.SCHEDULED_TO_ASSIGNED), + null); + + if (assignedTask == null) { + LOG.warning("Failed to look up task " + taskId + ", it may have been deleted."); + } else { + ITaskConfig task = assignedTask.getTask(); + AttributeAggregate aggregate = AttributeAggregate.getJobActiveState(store, task.getJob()); + try { + boolean launched = offerManager.launchFirst( + getAssignerFunction(store, new ResourceRequest(task, aggregate), taskId), + TaskGroupKey.from(task)); + + if (!launched) { + // Task could not be scheduled. + // TODO(maxim): Now that preemption slots are searched asynchronously, consider + // retrying a launch attempt within the current scheduling round IFF a reservation is + // available. + maybePreemptFor(assignedTask, aggregate, store); + attemptsNoMatch.incrementAndGet(); + return false; + } + } catch (OfferManager.LaunchException e) { + LOG.log(Level.WARNING, "Failed to launch task.", e); + attemptsFailed.incrementAndGet(); + + // The attempt to schedule the task failed, so we need to backpedal on the + // assignment. + // It is in the LOST state and a new task will move to PENDING to replace it. + // Should the state change fail due to storage issues, that's okay. The task will + // time out in the ASSIGNED state and be moved to LOST. + stateManager.changeState( + store, + taskId, + Optional.of(PENDING), + LOST, + LAUNCH_FAILED_MSG); + } + } + + return true; + } + + private void maybePreemptFor( + IAssignedTask task, + AttributeAggregate jobState, + MutableStoreProvider storeProvider) { + + if (!reservations.getByValue(TaskGroupKey.from(task.getTask())).isEmpty()) { + return; + } + Optional slaveId = preemptor.attemptPreemptionFor(task, jobState, storeProvider); + if (slaveId.isPresent()) { + reservations.put(slaveId.get(), TaskGroupKey.from(task.getTask())); + } + } + + @Subscribe + public void taskChanged(final TaskStateChange stateChangeEvent) { + if (Optional.of(PENDING).equals(stateChangeEvent.getOldState())) { + IAssignedTask assigned = stateChangeEvent.getTask().getAssignedTask(); + if (assigned.getSlaveId() != null) { + reservations.remove(assigned.getSlaveId(), TaskGroupKey.from(assigned.getTask())); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java new file mode 100644 index 0000000..b86bd28 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java @@ -0,0 +1,97 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.scheduling; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import javax.inject.Inject; + +import com.google.common.base.Optional; +import com.google.common.eventbus.Subscribe; +import com.twitter.common.stats.SlidingStats; +import com.twitter.common.util.Clock; + +import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; +import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; +import org.apache.aurora.scheduler.state.StateManager; +import org.apache.aurora.scheduler.storage.Storage; + +import static java.util.Objects.requireNonNull; + +import static org.apache.aurora.gen.ScheduleStatus.PENDING; +import static org.apache.aurora.gen.ScheduleStatus.THROTTLED; + +/** + * A holding area for tasks that have been throttled. Tasks entering the + * {@link org.apache.aurora.gen.ScheduleStatus#THROTTLED} state will be transitioned to + * {@link org.apache.aurora.gen.ScheduleStatus#PENDING} after the penalty period (as dictated by + * {@link RescheduleCalculator} has expired. + */ +class TaskThrottler implements EventSubscriber { + + private final RescheduleCalculator rescheduleCalculator; + private final Clock clock; + private final ScheduledExecutorService executor; + private final Storage storage; + private final StateManager stateManager; + + private final SlidingStats throttleStats = new SlidingStats("task_throttle", "ms"); + + @Inject + TaskThrottler( + RescheduleCalculator rescheduleCalculator, + Clock clock, + @AsyncExecutor ScheduledExecutorService executor, + Storage storage, + StateManager stateManager) { + + this.rescheduleCalculator = requireNonNull(rescheduleCalculator); + this.clock = requireNonNull(clock); + this.executor = requireNonNull(executor); + this.storage = requireNonNull(storage); + this.stateManager = requireNonNull(stateManager); + } + + @Subscribe + public void taskChangedState(final TaskStateChange stateChange) { + if (stateChange.getNewState() == THROTTLED) { + long readyAtMs = Tasks.getLatestEvent(stateChange.getTask()).getTimestamp() + + rescheduleCalculator.getFlappingPenaltyMs(stateChange.getTask()); + long delayMs = Math.max(0, readyAtMs - clock.nowMillis()); + throttleStats.accumulate(delayMs); + executor.schedule( + new Runnable() { + @Override + public void run() { + storage.write(new Storage.MutateWork.NoResult.Quiet() { + @Override + protected void execute(Storage.MutableStoreProvider storeProvider) { + stateManager.changeState( + storeProvider, + stateChange.getTaskId(), + Optional.of(THROTTLED), + PENDING, + Optional.absent()); + } + }); + } + }, + delayMs, + TimeUnit.MILLISECONDS); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java index ff33d0b..08844d0 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java @@ -45,12 +45,12 @@ import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskEvent; import org.apache.aurora.scheduler.TaskIdGenerator; -import org.apache.aurora.scheduler.async.RescheduleCalculator; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.events.PubsubEvent; import org.apache.aurora.scheduler.mesos.Driver; +import org.apache.aurora.scheduler.scheduling.RescheduleCalculator; import org.apache.aurora.scheduler.state.SideEffect.Action; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import org.apache.aurora.scheduler.storage.TaskStore; http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java index 825e772..b96c84c 100644 --- a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java +++ b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java @@ -31,9 +31,9 @@ import com.twitter.common.quantity.Time; import org.apache.aurora.gen.ResourceAggregate; import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.SchedulerServicesModule; -import org.apache.aurora.scheduler.async.OfferManager; import org.apache.aurora.scheduler.base.Conversions; import org.apache.aurora.scheduler.configuration.Resources; +import org.apache.aurora.scheduler.offers.OfferManager; import org.apache.aurora.scheduler.stats.SlotSizeCounter.MachineResource; import org.apache.aurora.scheduler.stats.SlotSizeCounter.MachineResourceProvider; import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java b/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java index 4ed6b15..5384307 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java @@ -29,13 +29,6 @@ import com.twitter.common.testing.easymock.EasyMockTest; import com.twitter.common.util.Clock; import org.apache.aurora.scheduler.AppStartup; -import org.apache.aurora.scheduler.async.preemptor.Preemptor; -import org.apache.aurora.scheduler.filter.SchedulingFilter; -import org.apache.aurora.scheduler.mesos.Driver; -import org.apache.aurora.scheduler.state.MaintenanceController; -import org.apache.aurora.scheduler.state.StateManager; -import org.apache.aurora.scheduler.state.TaskAssigner; -import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; import org.apache.aurora.scheduler.testing.FakeStatsProvider; import org.junit.Before; @@ -73,14 +66,7 @@ public class AsyncModuleTest extends EasyMockTest { protected void configure() { bind(StatsProvider.class).toInstance(statsProvider); bindMock(Clock.class); - bindMock(Driver.class); - bindMock(SchedulingFilter.class); - bindMock(MaintenanceController.class); - bindMock(Preemptor.class); - bindMock(StateManager.class); - bindMock(TaskAssigner.class); bindMock(Thread.UncaughtExceptionHandler.class); - bind(Storage.class).toInstance(storageUtil.storage); } }); } http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java deleted file mode 100644 index cb549a1..0000000 --- a/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -import java.util.concurrent.ScheduledExecutorService; - -import com.google.common.collect.ImmutableSet; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.testing.easymock.EasyMockTest; -import com.twitter.common.util.Clock; - -import org.apache.aurora.gen.JobKey; -import org.apache.aurora.gen.JobUpdateKey; -import org.apache.aurora.scheduler.async.JobUpdateHistoryPruner.HistoryPrunerSettings; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; -import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; -import org.apache.aurora.scheduler.testing.FakeScheduledExecutor; -import org.junit.Test; - -import static org.easymock.EasyMock.expect; - -public class JobUpdateHistoryPrunerTest extends EasyMockTest { - @Test - public void testExecution() throws Exception { - StorageTestUtil storageUtil = new StorageTestUtil(this); - storageUtil.expectOperations(); - - final ScheduledExecutorService executor = createMock(ScheduledExecutorService.class); - FakeScheduledExecutor executorClock = - FakeScheduledExecutor.scheduleAtFixedRateExecutor(executor, 2); - - Clock mockClock = createMock(Clock.class); - expect(mockClock.nowMillis()).andReturn(2L).times(2); - - expect(storageUtil.jobUpdateStore.pruneHistory(1, 1)) - .andReturn(ImmutableSet.of( - IJobUpdateKey.build( - new JobUpdateKey().setJob(new JobKey("role", "env", "job")).setId("id1")))); - expect(storageUtil.jobUpdateStore.pruneHistory(1, 1)).andReturn(ImmutableSet.of()); - - control.replay(); - - executorClock.assertEmpty(); - JobUpdateHistoryPruner pruner = new JobUpdateHistoryPruner( - mockClock, - executor, - storageUtil.storage, - new HistoryPrunerSettings( - Amount.of(1L, Time.MILLISECONDS), - Amount.of(1L, Time.MILLISECONDS), - 1)); - - pruner.startAsync().awaitRunning(); - executorClock.advance(Amount.of(1L, Time.MILLISECONDS)); - executorClock.advance(Amount.of(1L, Time.MILLISECONDS)); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java b/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java deleted file mode 100644 index a295fe8..0000000 --- a/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java +++ /dev/null @@ -1,157 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -import java.lang.Thread.UncaughtExceptionHandler; -import java.util.concurrent.ScheduledExecutorService; - -import javax.inject.Singleton; - -import com.google.common.eventbus.EventBus; -import com.google.common.testing.TearDown; -import com.google.inject.AbstractModule; -import com.google.inject.Guice; -import com.google.inject.Injector; -import com.twitter.common.application.modules.LifecycleModule; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.stats.StatsProvider; -import com.twitter.common.testing.easymock.EasyMockTest; -import com.twitter.common.util.BackoffStrategy; - -import org.apache.aurora.gen.AssignedTask; -import org.apache.aurora.gen.ScheduleStatus; -import org.apache.aurora.gen.ScheduledTask; -import org.apache.aurora.scheduler.base.Query; -import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; -import org.apache.aurora.scheduler.events.PubsubEventModule; -import org.apache.aurora.scheduler.mesos.Driver; -import org.apache.aurora.scheduler.state.PubsubTestUtil; -import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; -import org.apache.aurora.scheduler.testing.FakeScheduledExecutor; -import org.apache.aurora.scheduler.testing.FakeStatsProvider; -import org.junit.Before; -import org.junit.Test; - -import static org.apache.aurora.gen.ScheduleStatus.KILLING; -import static org.apache.aurora.gen.ScheduleStatus.RUNNING; -import static org.easymock.EasyMock.expect; -import static org.junit.Assert.assertEquals; - -public class KillRetryTest extends EasyMockTest { - - private Driver driver; - private StorageTestUtil storageUtil; - private BackoffStrategy backoffStrategy; - private FakeScheduledExecutor clock; - private EventBus eventBus; - private FakeStatsProvider statsProvider; - - @Before - public void setUp() throws Exception { - driver = createMock(Driver.class); - storageUtil = new StorageTestUtil(this); - storageUtil.expectOperations(); - backoffStrategy = createMock(BackoffStrategy.class); - final ScheduledExecutorService executorMock = createMock(ScheduledExecutorService.class); - clock = FakeScheduledExecutor.scheduleExecutor(executorMock); - addTearDown(new TearDown() { - @Override - public void tearDown() { - clock.assertEmpty(); - } - }); - statsProvider = new FakeStatsProvider(); - - Injector injector = Guice.createInjector( - new LifecycleModule(), - new PubsubEventModule(false), - new AbstractModule() { - @Override - protected void configure() { - bind(Driver.class).toInstance(driver); - bind(Storage.class).toInstance(storageUtil.storage); - bind(ScheduledExecutorService.class).toInstance(executorMock); - PubsubEventModule.bindSubscriber(binder(), KillRetry.class); - bind(KillRetry.class).in(Singleton.class); - bind(BackoffStrategy.class).toInstance(backoffStrategy); - bind(StatsProvider.class).toInstance(statsProvider); - bind(UncaughtExceptionHandler.class) - .toInstance(createMock(UncaughtExceptionHandler.class)); - } - } - ); - eventBus = injector.getInstance(EventBus.class); - PubsubTestUtil.startPubsub(injector); - } - - private static IScheduledTask makeTask(String id, ScheduleStatus status) { - return IScheduledTask.build(new ScheduledTask() - .setStatus(status) - .setAssignedTask(new AssignedTask().setTaskId(id))); - } - - private void moveToKilling(String taskId) { - eventBus.post(TaskStateChange.transition(makeTask(taskId, KILLING), RUNNING)); - } - - private static Query.Builder killingQuery(String taskId) { - return Query.taskScoped(taskId).byStatus(KILLING); - } - - private void expectGetRetryDelay(long prevRetryMs, long retryInMs) { - expect(backoffStrategy.calculateBackoffMs(prevRetryMs)).andReturn(retryInMs); - } - - private void expectRetry(String taskId, long prevRetryMs, long nextRetryMs) { - storageUtil.expectTaskFetch(killingQuery(taskId), makeTask(taskId, KILLING)); - driver.killTask(taskId); - expectGetRetryDelay(prevRetryMs, nextRetryMs); - } - - @Test - public void testRetries() { - String taskId = "a"; - expectGetRetryDelay(0, 100); - expectRetry(taskId, 100, 1000); - expectRetry(taskId, 1000, 10000); - - // Signal that task has transitioned. - storageUtil.expectTaskFetch(killingQuery(taskId)); - - control.replay(); - - moveToKilling(taskId); - clock.advance(Amount.of(100L, Time.MILLISECONDS)); - clock.advance(Amount.of(1000L, Time.MILLISECONDS)); - clock.advance(Amount.of(10000L, Time.MILLISECONDS)); - assertEquals(2L, statsProvider.getLongValue(KillRetry.RETRIES_COUNTER)); - } - - @Test - public void testDoesNotRetry() { - String taskId = "a"; - expectGetRetryDelay(0, 100); - - storageUtil.expectTaskFetch(killingQuery(taskId)); - - control.replay(); - - moveToKilling(taskId); - clock.advance(Amount.of(100L, Time.MILLISECONDS)); - assertEquals(0L, statsProvider.getLongValue(KillRetry.RETRIES_COUNTER)); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/OfferManagerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/OfferManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/OfferManagerImplTest.java deleted file mode 100644 index 874a124..0000000 --- a/src/test/java/org/apache/aurora/scheduler/async/OfferManagerImplTest.java +++ /dev/null @@ -1,234 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -import java.util.concurrent.ScheduledExecutorService; -import java.util.logging.Level; - -import com.google.common.base.Function; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.testing.TearDown; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.testing.easymock.EasyMockTest; - -import org.apache.aurora.gen.HostAttributes; -import org.apache.aurora.gen.JobKey; -import org.apache.aurora.gen.MaintenanceMode; -import org.apache.aurora.gen.TaskConfig; -import org.apache.aurora.scheduler.HostOffer; -import org.apache.aurora.scheduler.async.OfferManager.OfferManagerImpl; -import org.apache.aurora.scheduler.async.OfferManager.OfferReturnDelay; -import org.apache.aurora.scheduler.base.TaskGroupKey; -import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected; -import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; -import org.apache.aurora.scheduler.mesos.Driver; -import org.apache.aurora.scheduler.state.TaskAssigner.Assignment; -import org.apache.aurora.scheduler.storage.entities.IHostAttributes; -import org.apache.aurora.scheduler.storage.entities.ITaskConfig; -import org.apache.aurora.scheduler.testing.FakeScheduledExecutor; -import org.apache.mesos.Protos.TaskInfo; -import org.junit.Before; -import org.junit.Test; - -import static org.apache.aurora.gen.MaintenanceMode.DRAINING; -import static org.apache.aurora.gen.MaintenanceMode.NONE; -import static org.easymock.EasyMock.expect; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -public class OfferManagerImplTest extends EasyMockTest { - - private static final Amount RETURN_DELAY = Amount.of(1L, Time.DAYS); - private static final String HOST_A = "HOST_A"; - private static final HostOffer OFFER_A = new HostOffer( - Offers.makeOffer("OFFER_A", HOST_A), - IHostAttributes.build(new HostAttributes().setMode(NONE))); - private static final String HOST_B = "HOST_B"; - private static final HostOffer OFFER_B = new HostOffer( - Offers.makeOffer("OFFER_B", HOST_B), - IHostAttributes.build(new HostAttributes().setMode(NONE))); - private static final String HOST_C = "HOST_C"; - private static final HostOffer OFFER_C = new HostOffer( - Offers.makeOffer("OFFER_C", HOST_C), - IHostAttributes.build(new HostAttributes().setMode(NONE))); - private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from( - ITaskConfig.build(new TaskConfig().setJob(new JobKey("role", "env", "name")))); - - private Driver driver; - private FakeScheduledExecutor clock; - private Function offerAcceptor; - private OfferManagerImpl offerManager; - - @Before - public void setUp() { - offerManager.LOG.setLevel(Level.FINE); - addTearDown(new TearDown() { - @Override - public void tearDown() throws Exception { - offerManager.LOG.setLevel(Level.INFO); - } - }); - driver = createMock(Driver.class); - ScheduledExecutorService executorMock = createMock(ScheduledExecutorService.class); - clock = FakeScheduledExecutor.scheduleExecutor(executorMock); - - addTearDown(new TearDown() { - @Override - public void tearDown() throws Exception { - clock.assertEmpty(); - } - }); - offerAcceptor = createMock(new Clazz>() { }); - OfferReturnDelay returnDelay = new OfferReturnDelay() { - @Override - public Amount get() { - return RETURN_DELAY; - } - }; - offerManager = new OfferManagerImpl(driver, returnDelay, executorMock); - } - - @Test - public void testOffersSorted() throws Exception { - // Ensures that non-DRAINING offers are preferred - the DRAINING offer would be tried last. - - HostOffer offerA = setMode(OFFER_A, DRAINING); - HostOffer offerC = setMode(OFFER_C, DRAINING); - - TaskInfo task = TaskInfo.getDefaultInstance(); - expect(offerAcceptor.apply(OFFER_B)).andReturn(Assignment.success(task)); - driver.launchTask(OFFER_B.getOffer().getId(), task); - - driver.declineOffer(offerA.getOffer().getId()); - driver.declineOffer(offerC.getOffer().getId()); - - control.replay(); - - offerManager.addOffer(offerA); - offerManager.addOffer(OFFER_B); - offerManager.addOffer(offerC); - assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); - clock.advance(RETURN_DELAY); - } - - @Test - public void testGetOffersReturnsAllOffers() throws Exception { - expect(offerAcceptor.apply(OFFER_A)) - .andReturn(Assignment.failure(ImmutableSet.of(Veto.constraintMismatch("denied")))); - - control.replay(); - - offerManager.addOffer(OFFER_A); - assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); - assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers())); - - offerManager.cancelOffer(OFFER_A.getOffer().getId()); - assertTrue(Iterables.isEmpty(offerManager.getOffers())); - - clock.advance(RETURN_DELAY); - } - - @Test - public void testOfferFilteringDueToStaticBan() throws Exception { - expect(offerAcceptor.apply(OFFER_A)) - .andReturn(Assignment.failure(ImmutableSet.of(Veto.constraintMismatch("denied")))); - - TaskInfo task = TaskInfo.getDefaultInstance(); - expect(offerAcceptor.apply(OFFER_B)).andReturn(Assignment.success(task)); - driver.launchTask(OFFER_B.getOffer().getId(), task); - - driver.declineOffer(OFFER_A.getOffer().getId()); - - control.replay(); - - offerManager.addOffer(OFFER_A); - assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); - // Run again to make sure all offers are banned (via no expectations set). - assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); - - // Add a new offer to accept the task previously banned for OFFER_A. - offerManager.addOffer(OFFER_B); - assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); - - clock.advance(RETURN_DELAY); - } - - @Test - public void testStaticBanIsCleared() throws Exception { - expect(offerAcceptor.apply(OFFER_A)) - .andReturn(Assignment.failure(ImmutableSet.of(Veto.insufficientResources("ram", 100)))); - - TaskInfo task = TaskInfo.getDefaultInstance(); - expect(offerAcceptor.apply(OFFER_A)).andReturn(Assignment.success(task)); - driver.launchTask(OFFER_A.getOffer().getId(), task); - - expect(offerAcceptor.apply(OFFER_A)) - .andReturn(Assignment.failure(ImmutableSet.of(Veto.maintenance("draining")))); - - expect(offerAcceptor.apply(OFFER_A)).andReturn(Assignment.success(task)); - driver.launchTask(OFFER_A.getOffer().getId(), task); - - driver.declineOffer(OFFER_A.getOffer().getId()); - - control.replay(); - - offerManager.addOffer(OFFER_A); - assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); - - // Make sure the static ban is cleared when the offers are returned. - clock.advance(RETURN_DELAY); - offerManager.addOffer(OFFER_A); - assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); - - offerManager.addOffer(OFFER_A); - assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); - - // Make sure the static ban is cleared when driver is disconnected. - offerManager.driverDisconnected(new DriverDisconnected()); - offerManager.addOffer(OFFER_A); - assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); - - clock.advance(RETURN_DELAY); - } - - @Test - public void testFlushOffers() throws Exception { - control.replay(); - - offerManager.addOffer(OFFER_A); - offerManager.addOffer(OFFER_B); - offerManager.driverDisconnected(new DriverDisconnected()); - assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); - clock.advance(RETURN_DELAY); - } - - @Test - public void testDeclineOffer() throws Exception { - driver.declineOffer(OFFER_A.getOffer().getId()); - - control.replay(); - - offerManager.addOffer(OFFER_A); - clock.advance(RETURN_DELAY); - } - - private static HostOffer setMode(HostOffer offer, MaintenanceMode mode) { - return new HostOffer( - offer.getOffer(), - IHostAttributes.build(offer.getAttributes().newBuilder().setMode(mode))); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/Offers.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/Offers.java b/src/test/java/org/apache/aurora/scheduler/async/Offers.java deleted file mode 100644 index 8293dd1..0000000 --- a/src/test/java/org/apache/aurora/scheduler/async/Offers.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -import org.apache.mesos.Protos.FrameworkID; -import org.apache.mesos.Protos.Offer; -import org.apache.mesos.Protos.OfferID; -import org.apache.mesos.Protos.SlaveID; - -/** - * Utility class for creating resource offers. - */ -final class Offers { - private Offers() { - // Utility class. - } - - static final String DEFAULT_HOST = "hostname"; - - static Offer makeOffer(String offerId) { - return Offers.makeOffer(offerId, DEFAULT_HOST); - } - - static Offer makeOffer(String offerId, String hostName) { - return Offer.newBuilder() - .setId(OfferID.newBuilder().setValue(offerId)) - .setFrameworkId(FrameworkID.newBuilder().setValue("framework_id")) - .setSlaveId(SlaveID.newBuilder().setValue("slave_id-" + offerId)) - .setHostname(hostName) - .build(); - } -}