Return-Path: X-Original-To: apmail-aurora-commits-archive@minotaur.apache.org Delivered-To: apmail-aurora-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3D96818775 for ; Tue, 23 Jun 2015 20:43:12 +0000 (UTC) Received: (qmail 1408 invoked by uid 500); 23 Jun 2015 20:43:12 -0000 Delivered-To: apmail-aurora-commits-archive@aurora.apache.org Received: (qmail 1375 invoked by uid 500); 23 Jun 2015 20:43:12 -0000 Mailing-List: contact commits-help@aurora.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aurora.apache.org Delivered-To: mailing list commits@aurora.apache.org Received: (qmail 1366 invoked by uid 99); 23 Jun 2015 20:43:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Jun 2015 20:43:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id ED223E17FA; Tue, 23 Jun 2015 20:43:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: maxim@apache.org To: commits@aurora.apache.org Message-Id: <8f98536220c641dbb218c0bc4f30e0ce@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: aurora git commit: Refactoring TaskLauncher. Date: Tue, 23 Jun 2015 20:43:11 +0000 (UTC) Repository: aurora Updated Branches: refs/heads/master 68c46205e -> 4b8c34c0e Refactoring TaskLauncher. Bugs closed: AURORA-1334 Reviewed at https://reviews.apache.org/r/35761/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/4b8c34c0 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/4b8c34c0 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/4b8c34c0 Branch: refs/heads/master Commit: 4b8c34c0e924b9fdd170eb7f05bd6b92580ce7b8 Parents: 68c4620 Author: Maxim Khutornenko Authored: Tue Jun 23 13:42:54 2015 -0700 Committer: Maxim Khutornenko Committed: Tue Jun 23 13:42:54 2015 -0700 ---------------------------------------------------------------------- .../aurora/benchmark/StatusUpdateBenchmark.java | 29 +- .../aurora/scheduler/SchedulerModule.java | 24 +- .../apache/aurora/scheduler/TaskLauncher.java | 54 ---- .../aurora/scheduler/TaskStatusHandler.java | 28 ++ .../aurora/scheduler/TaskStatusHandlerImpl.java | 194 ++++++++++++++ .../aurora/scheduler/UserTaskLauncher.java | 212 --------------- .../aurora/scheduler/async/AsyncModule.java | 4 +- .../scheduler/mesos/MesosSchedulerImpl.java | 45 +--- .../scheduler/TaskStatusHandlerImplTest.java | 232 ++++++++++++++++ .../aurora/scheduler/UserTaskLauncherTest.java | 265 ------------------- .../scheduler/mesos/MesosSchedulerImplTest.java | 141 ++++------ 11 files changed, 538 insertions(+), 690 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/4b8c34c0/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java index 4c63cc7..308bbd9 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java +++ b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java @@ -13,7 +13,6 @@ */ package org.apache.aurora.benchmark; -import java.util.List; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; @@ -25,14 +24,13 @@ import java.util.logging.Logger; import javax.inject.Singleton; import com.google.common.base.Optional; -import com.google.common.collect.ImmutableList; import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; +import com.google.common.util.concurrent.AbstractExecutionThreadService; import com.google.common.util.concurrent.Uninterruptibles; import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; -import com.google.inject.Provides; import com.google.inject.TypeLiteral; import com.twitter.common.application.ShutdownStage; @@ -50,8 +48,8 @@ import org.apache.aurora.benchmark.fakes.FakeSchedulerDriver; import org.apache.aurora.benchmark.fakes.FakeStatsProvider; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.scheduler.TaskIdGenerator; -import org.apache.aurora.scheduler.TaskLauncher; -import org.apache.aurora.scheduler.UserTaskLauncher; +import org.apache.aurora.scheduler.TaskStatusHandler; +import org.apache.aurora.scheduler.TaskStatusHandlerImpl; import org.apache.aurora.scheduler.async.OfferManager; import org.apache.aurora.scheduler.async.RescheduleCalculator; import org.apache.aurora.scheduler.async.preemptor.ClusterStateImpl; @@ -164,7 +162,7 @@ public class StatusUpdateBenchmark { private long latencyMilliseconds; private Scheduler scheduler; - private UserTaskLauncher userTaskLauncher; + private AbstractExecutionThreadService statusHandler; private SlowStorageWrapper storage; private EventBus eventBus; private Set tasks; @@ -243,18 +241,13 @@ public class StatusUpdateBenchmark { } }); bind(new TypeLiteral>() { }) - .annotatedWith(UserTaskLauncher.StatusUpdateQueue.class) + .annotatedWith(TaskStatusHandlerImpl.StatusUpdateQueue.class) .toInstance(new LinkedBlockingQueue()); bind(new TypeLiteral() { }) - .annotatedWith(UserTaskLauncher.MaxBatchSize.class) + .annotatedWith(TaskStatusHandlerImpl.MaxBatchSize.class) .toInstance(1000); - bind(UserTaskLauncher.class).in(Singleton.class); - } - - @Provides - @Singleton - List provideTaskLaunchers(UserTaskLauncher launcher) { - return ImmutableList.of(launcher); + bind(TaskStatusHandler.class).to(TaskStatusHandlerImpl.class); + bind(TaskStatusHandlerImpl.class).in(Singleton.class); } } ); @@ -263,13 +256,13 @@ public class StatusUpdateBenchmark { scheduler = injector.getInstance(Scheduler.class); eventBus.register(this); - userTaskLauncher = injector.getInstance(UserTaskLauncher.class); - userTaskLauncher.startAsync(); + statusHandler = injector.getInstance(TaskStatusHandlerImpl.class); + statusHandler.startAsync(); } @TearDown(Level.Trial) public void tearDown() { - userTaskLauncher.stopAsync(); + statusHandler.stopAsync(); } /** http://git-wip-us.apache.org/repos/asf/aurora/blob/4b8c34c0/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java index ae31bdb..26093ef 100644 --- a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java +++ b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java @@ -13,7 +13,6 @@ */ package org.apache.aurora.scheduler; -import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; @@ -21,10 +20,8 @@ import java.util.logging.Logger; import javax.inject.Singleton; -import com.google.common.collect.ImmutableList; import com.google.inject.AbstractModule; import com.google.inject.PrivateModule; -import com.google.inject.Provides; import com.google.inject.TypeLiteral; import com.twitter.common.args.Arg; @@ -39,6 +36,8 @@ import org.apache.aurora.scheduler.base.AsyncUtil; import org.apache.aurora.scheduler.events.PubsubEventModule; import org.apache.mesos.Protos; +import static org.apache.aurora.scheduler.SchedulerServicesModule.addSchedulerActiveServiceBinding; + /** * Binding module for top-level scheduling logic. */ @@ -83,22 +82,17 @@ public class SchedulerModule extends AbstractModule { PubsubEventModule.bindSubscriber(binder(), SchedulerLifecycle.class); bind(TaskVars.class).in(Singleton.class); PubsubEventModule.bindSubscriber(binder(), TaskVars.class); - SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(TaskVars.class); + addSchedulerActiveServiceBinding(binder()).to(TaskVars.class); bind(new TypeLiteral>() { }) - .annotatedWith(UserTaskLauncher.StatusUpdateQueue.class) - .toInstance(new LinkedBlockingQueue()); + .annotatedWith(TaskStatusHandlerImpl.StatusUpdateQueue.class) + .toInstance(new LinkedBlockingQueue<>()); bind(new TypeLiteral() { }) - .annotatedWith(UserTaskLauncher.MaxBatchSize.class) + .annotatedWith(TaskStatusHandlerImpl.MaxBatchSize.class) .toInstance(MAX_STATUS_UPDATE_BATCH_SIZE.get()); - bind(UserTaskLauncher.class).in(Singleton.class); - SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(UserTaskLauncher.class); - } - - @Provides - @Singleton - List provideTaskLaunchers(UserTaskLauncher userTaskLauncher) { - return ImmutableList.of(userTaskLauncher); + bind(TaskStatusHandler.class).to(TaskStatusHandlerImpl.class); + bind(TaskStatusHandlerImpl.class).in(Singleton.class); + addSchedulerActiveServiceBinding(binder()).to(TaskStatusHandlerImpl.class); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/4b8c34c0/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java b/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java deleted file mode 100644 index cd55a6e..0000000 --- a/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler; - -import org.apache.mesos.Protos.OfferID; -import org.apache.mesos.Protos.TaskStatus; - -/** - * A receiver of resource offers and task status updates. - */ -public interface TaskLauncher { - - /** - * Presents a resource offer to the task launcher, which will be passed to any subsequent task - * launchers if this one does not accept. - *

- * A task launcher may choose to retain an offer for later use. Any retained offers must be - * cleaned up with {@link #cancelOffer(OfferID)}. - * - * @param offer The resource offer. - * @return {@code false} if the launcher will not act on the offer, or {@code true} if the - * launcher may accept the offer at some point in the future. - */ - boolean willUse(HostOffer offer); - - /** - * Informs the launcher that a status update has been received for a task. If the task is not - * associated with the launcher, it should return {@code false} so that another launcher may - * receive it. - * - * @param status The status update. - * @return {@code true} if the status is relevant to the launcher and should not be delivered to - * other launchers, {@code false} otherwise. - */ - boolean statusUpdate(TaskStatus status); - - /** - * Informs the launcher that a previously-advertised offer is canceled and may not be used. - * - * @param offer The canceled offer. - */ - void cancelOffer(OfferID offer); -} http://git-wip-us.apache.org/repos/asf/aurora/blob/4b8c34c0/src/main/java/org/apache/aurora/scheduler/TaskStatusHandler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/TaskStatusHandler.java b/src/main/java/org/apache/aurora/scheduler/TaskStatusHandler.java new file mode 100644 index 0000000..3e132ee --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/TaskStatusHandler.java @@ -0,0 +1,28 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler; + +import org.apache.mesos.Protos.TaskStatus; + +/** + * A handler of task status updates. + */ +public interface TaskStatusHandler { + /** + * Informs the status handler that a status update has been received for a task. + * + * @param status The status update. + */ + void statusUpdate(TaskStatus status); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/4b8c34c0/src/main/java/org/apache/aurora/scheduler/TaskStatusHandlerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/TaskStatusHandlerImpl.java b/src/main/java/org/apache/aurora/scheduler/TaskStatusHandlerImpl.java new file mode 100644 index 0000000..538cb75 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/TaskStatusHandlerImpl.java @@ -0,0 +1,194 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.inject.Inject; +import javax.inject.Qualifier; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.AbstractExecutionThreadService; +import com.google.common.util.concurrent.MoreExecutors; + +import com.twitter.common.stats.Stats; + +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.scheduler.base.Conversions; +import org.apache.aurora.scheduler.mesos.Driver; +import org.apache.aurora.scheduler.state.StateChangeResult; +import org.apache.aurora.scheduler.state.StateManager; +import org.apache.aurora.scheduler.stats.CachedCounters; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.mesos.Protos.TaskStatus; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; +import static java.util.Objects.requireNonNull; + +/** + * A {@link TaskStatusHandler} implementation. + */ +@VisibleForTesting +public class TaskStatusHandlerImpl extends AbstractExecutionThreadService + implements TaskStatusHandler { + + private static final Logger LOG = Logger.getLogger(TaskStatusHandlerImpl.class.getName()); + + @VisibleForTesting + static final String MEMORY_LIMIT_DISPLAY = "Task used more memory than requested."; + + private static final String STATUS_STAT_FORMAT = "status_update_%s_%s"; + + private final Storage storage; + private final StateManager stateManager; + private final Driver driver; + private final BlockingQueue pendingUpdates; + private final int maxBatchSize; + private final CachedCounters counters; + + private final AtomicReference threadReference = new AtomicReference<>(); + + /** + * Binding annotation for the status update queue. + */ + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + public @interface StatusUpdateQueue { } + + /** + * Binding annotation maximum size of a status update batch. + */ + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + public @interface MaxBatchSize { } + + @Inject + TaskStatusHandlerImpl( + Storage storage, + StateManager stateManager, + final Driver driver, + @StatusUpdateQueue BlockingQueue pendingUpdates, + @MaxBatchSize Integer maxBatchSize, + CachedCounters counters) { + + this.storage = requireNonNull(storage); + this.stateManager = requireNonNull(stateManager); + this.driver = requireNonNull(driver); + this.pendingUpdates = requireNonNull(pendingUpdates); + this.maxBatchSize = requireNonNull(maxBatchSize); + this.counters = requireNonNull(counters); + + Stats.exportSize("status_updates_queue_size", this.pendingUpdates); + + addListener( + new Listener() { + @Override + public void failed(State from, Throwable failure) { + LOG.log(Level.SEVERE, "TaskStatusHandler failed: ", failure); + driver.abort(); + } + }, + MoreExecutors.sameThreadExecutor()); + } + + @Override + public void statusUpdate(TaskStatus status) { + pendingUpdates.add(status); + } + + @Override + protected void triggerShutdown() { + Thread thread = threadReference.get(); + + if (thread != null) { + thread.interrupt(); + } + } + + @Override + protected void run() { + threadReference.set(Thread.currentThread()); + + while (isRunning()) { + final Queue updates = new ArrayDeque<>(); + + try { + updates.add(pendingUpdates.take()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + + // Process all other available updates, up to the limit on batch size. + // TODO(bmahler): Expose histogram metrics of the batch sizes. + pendingUpdates.drainTo(updates, maxBatchSize - updates.size()); + + try { + storage.write(new Storage.MutateWork.NoResult.Quiet() { + @Override + protected void execute(Storage.MutableStoreProvider storeProvider) { + for (TaskStatus status : updates) { + ScheduleStatus translatedState = Conversions.convertProtoState(status.getState()); + + Optional message = Optional.absent(); + if (status.hasMessage()) { + message = Optional.of(status.getMessage()); + } + + if (translatedState == ScheduleStatus.FAILED && status.hasReason() + && status.getReason() == TaskStatus.Reason.REASON_MEMORY_LIMIT) { + message = Optional.of(MEMORY_LIMIT_DISPLAY); + } + + StateChangeResult result = stateManager.changeState( + storeProvider, + status.getTaskId().getValue(), + Optional.absent(), + translatedState, + message); + + if (status.hasReason()) { + counters.get(statName(status, result)).incrementAndGet(); + } + } + } + }); + + for (TaskStatus status : updates) { + driver.acknowledgeStatusUpdate(status); + } + } catch (RuntimeException e) { + LOG.log(Level.SEVERE, "Failed to process status update batch " + updates, e); + } + } + } + + @VisibleForTesting + static String statName(TaskStatus status, StateChangeResult result) { + return String.format(STATUS_STAT_FORMAT, status.getReason(), result); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/4b8c34c0/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java deleted file mode 100644 index 6bfbf0c..0000000 --- a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java +++ /dev/null @@ -1,212 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler; - -import java.lang.annotation.Retention; -import java.lang.annotation.Target; -import java.util.ArrayDeque; -import java.util.Queue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.atomic.AtomicReference; -import java.util.logging.Level; -import java.util.logging.Logger; - -import javax.inject.Inject; -import javax.inject.Qualifier; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.util.concurrent.AbstractExecutionThreadService; -import com.google.common.util.concurrent.MoreExecutors; - -import com.twitter.common.stats.Stats; - -import org.apache.aurora.gen.ScheduleStatus; -import org.apache.aurora.scheduler.async.OfferManager; -import org.apache.aurora.scheduler.base.Conversions; -import org.apache.aurora.scheduler.mesos.Driver; -import org.apache.aurora.scheduler.state.StateChangeResult; -import org.apache.aurora.scheduler.state.StateManager; -import org.apache.aurora.scheduler.stats.CachedCounters; -import org.apache.aurora.scheduler.storage.Storage; -import org.apache.mesos.Protos.OfferID; -import org.apache.mesos.Protos.TaskStatus; - -import static java.lang.annotation.ElementType.FIELD; -import static java.lang.annotation.ElementType.METHOD; -import static java.lang.annotation.ElementType.PARAMETER; -import static java.lang.annotation.RetentionPolicy.RUNTIME; -import static java.util.Objects.requireNonNull; - -/** - * A task launcher that matches resource offers against user tasks. - */ -@VisibleForTesting -public class UserTaskLauncher extends AbstractExecutionThreadService implements TaskLauncher { - - private static final Logger LOG = Logger.getLogger(UserTaskLauncher.class.getName()); - - @VisibleForTesting - static final String MEMORY_LIMIT_DISPLAY = "Task used more memory than requested."; - - private static final String STATUS_STAT_FORMAT = "status_update_%s_%s"; - - private final Storage storage; - private final OfferManager offerManager; - private final StateManager stateManager; - private final Driver driver; - private final BlockingQueue pendingUpdates; - private final int maxBatchSize; - private final CachedCounters counters; - - private final AtomicReference threadReference = new AtomicReference<>(); - - /** - * Binding annotation for the status update queue. - */ - @VisibleForTesting - @Qualifier - @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) - public @interface StatusUpdateQueue { } - - /** - * Binding annotation maximum size of a status update batch. - */ - @VisibleForTesting - @Qualifier - @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) - public @interface MaxBatchSize { } - - @Inject - UserTaskLauncher( - Storage storage, - OfferManager offerManager, - StateManager stateManager, - final Driver driver, - @StatusUpdateQueue BlockingQueue pendingUpdates, - @MaxBatchSize Integer maxBatchSize, - CachedCounters counters) { - - this.storage = requireNonNull(storage); - this.offerManager = requireNonNull(offerManager); - this.stateManager = requireNonNull(stateManager); - this.driver = requireNonNull(driver); - this.pendingUpdates = requireNonNull(pendingUpdates); - this.maxBatchSize = requireNonNull(maxBatchSize); - this.counters = requireNonNull(counters); - - Stats.exportSize("status_updates_queue_size", this.pendingUpdates); - - addListener( - new Listener() { - @Override - public void failed(State from, Throwable failure) { - LOG.log(Level.SEVERE, "UserTaskLauncher failed: ", failure); - driver.abort(); - } - }, - MoreExecutors.sameThreadExecutor()); - } - - @Override - public boolean willUse(HostOffer offer) { - requireNonNull(offer); - - offerManager.addOffer(offer); - return true; - } - - @Override - public boolean statusUpdate(TaskStatus status) { - pendingUpdates.add(status); - return true; - } - - @Override - public void cancelOffer(OfferID offer) { - offerManager.cancelOffer(offer); - } - - @Override - protected void triggerShutdown() { - Thread thread = threadReference.get(); - - if (thread != null) { - thread.interrupt(); - } - } - - @Override - protected void run() { - threadReference.set(Thread.currentThread()); - - while (isRunning()) { - final Queue updates = new ArrayDeque<>(); - - try { - updates.add(pendingUpdates.take()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - break; - } - - // Process all other available updates, up to the limit on batch size. - // TODO(bmahler): Expose histogram metrics of the batch sizes. - pendingUpdates.drainTo(updates, maxBatchSize - updates.size()); - - try { - storage.write(new Storage.MutateWork.NoResult.Quiet() { - @Override - protected void execute(Storage.MutableStoreProvider storeProvider) { - for (TaskStatus status : updates) { - ScheduleStatus translatedState = Conversions.convertProtoState(status.getState()); - - Optional message = Optional.absent(); - if (status.hasMessage()) { - message = Optional.of(status.getMessage()); - } - - if (translatedState == ScheduleStatus.FAILED && status.hasReason() - && status.getReason() == TaskStatus.Reason.REASON_MEMORY_LIMIT) { - message = Optional.of(MEMORY_LIMIT_DISPLAY); - } - - StateChangeResult result = stateManager.changeState( - storeProvider, - status.getTaskId().getValue(), - Optional.absent(), - translatedState, - message); - - if (status.hasReason()) { - counters.get(statName(status, result)).incrementAndGet(); - } - } - } - }); - - for (TaskStatus status : updates) { - driver.acknowledgeStatusUpdate(status); - } - } catch (RuntimeException e) { - LOG.log(Level.SEVERE, "Failed to process status update batch " + updates, e); - } - } - } - - @VisibleForTesting - static String statName(TaskStatus status, StateChangeResult result) { - return String.format(STATUS_STAT_FORMAT, status.getReason(), result); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/4b8c34c0/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java index 17b3585..8c2d751 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java +++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java @@ -170,10 +170,12 @@ public class AsyncModule extends AbstractModule { private static final Arg> RESERVATION_DURATION = Arg.create(Amount.of(3L, Time.MINUTES)); + // Reconciliation may create a big surge of status updates in a large cluster. Setting the default + // initial delay to 1 minute to ease up storage contention during scheduler start up. @CmdLine(name = "reconciliation_initial_delay", help = "Initial amount of time to delay task reconciliation after scheduler start up.") private static final Arg> RECONCILIATION_INITIAL_DELAY = - Arg.create(Amount.of(0L, Time.MINUTES)); + Arg.create(Amount.of(1L, Time.MINUTES)); @Positive @CmdLine(name = "reconciliation_explicit_interval", http://git-wip-us.apache.org/repos/asf/aurora/blob/4b8c34c0/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java index 4f7a1be..33749de 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java @@ -32,7 +32,8 @@ import com.twitter.common.inject.TimedInterceptor.Timed; import org.apache.aurora.GuiceUtils.AllowUnchecked; import org.apache.aurora.scheduler.HostOffer; -import org.apache.aurora.scheduler.TaskLauncher; +import org.apache.aurora.scheduler.TaskStatusHandler; +import org.apache.aurora.scheduler.async.OfferManager; import org.apache.aurora.scheduler.base.SchedulerException; import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected; @@ -67,8 +68,8 @@ import static org.apache.mesos.Protos.TaskStatus.Reason.REASON_RECONCILIATION; */ @VisibleForTesting public class MesosSchedulerImpl implements Scheduler { - private final List taskLaunchers; - + private final TaskStatusHandler taskStatusHandler; + private final OfferManager offerManager; private final Storage storage; private final Lifecycle lifecycle; private final EventSink eventSink; @@ -90,11 +91,8 @@ public class MesosSchedulerImpl implements Scheduler { * * @param storage Store to save host attributes into. * @param lifecycle Application lifecycle manager. - * @param taskLaunchers Task launchers, which will be used in order. Calls to - * {@link TaskLauncher#willUse(HostOffer)} and - * {@link TaskLauncher#statusUpdate(TaskStatus)} are propagated to provided - * launchers, ceasing after the first match (based on a return value of - * {@code true}. + * @param taskStatusHandler Task status update manager. + * @param offerManager Offer manager. * @param eventSink Pubsub sink to send driver status changes to. * @param executor Executor for async work */ @@ -102,7 +100,8 @@ public class MesosSchedulerImpl implements Scheduler { public MesosSchedulerImpl( Storage storage, final Lifecycle lifecycle, - List taskLaunchers, + TaskStatusHandler taskStatusHandler, + OfferManager offerManager, EventSink eventSink, @SchedulerExecutor Executor executor, Logger log, @@ -110,7 +109,8 @@ public class MesosSchedulerImpl implements Scheduler { this.storage = requireNonNull(storage); this.lifecycle = requireNonNull(lifecycle); - this.taskLaunchers = requireNonNull(taskLaunchers); + this.taskStatusHandler = requireNonNull(taskStatusHandler); + this.offerManager = requireNonNull(offerManager); this.eventSink = requireNonNull(eventSink); this.executor = requireNonNull(executor); this.log = requireNonNull(log); @@ -174,11 +174,7 @@ public class MesosSchedulerImpl implements Scheduler { log.log(Level.FINE, String.format("Received offer: %s", offer)); } counters.get("scheduler_resource_offers").incrementAndGet(); - for (TaskLauncher launcher : taskLaunchers) { - if (launcher.willUse(new HostOffer(offer, attributes))) { - break; - } - } + offerManager.addOffer(new HostOffer(offer, attributes)); } } }); @@ -189,9 +185,7 @@ public class MesosSchedulerImpl implements Scheduler { @Override public void offerRescinded(SchedulerDriver schedulerDriver, OfferID offerId) { log.info("Offer rescinded: " + offerId); - for (TaskLauncher launcher : taskLaunchers) { - launcher.cancelOffer(offerId); - } + offerManager.cancelOffer(offerId); } private static void logStatusUpdate(Logger logger, TaskStatus status) { @@ -236,24 +230,13 @@ public class MesosSchedulerImpl implements Scheduler { Optional.fromNullable(status.getTimestamp()).transform(SECONDS_TO_MICROS))); try { - for (TaskLauncher launcher : taskLaunchers) { - if (launcher.statusUpdate(status)) { - // The launcher is responsible for acknowledging the update. - return; - } - } + // The status handler is responsible for acknowledging the update. + taskStatusHandler.statusUpdate(status); } catch (SchedulerException e) { log.log(Level.SEVERE, "Status update failed due to scheduler exception: " + e, e); // We re-throw the exception here to trigger an abort of the driver. throw e; } - - log.warning("Unhandled status update " + status); - counters.get("scheduler_status_updates").incrementAndGet(); - - // Even though we have not handled this update, we acknowledge it to prevent an unbounded - // growth of status update retries from Mesos. - driver.acknowledgeStatusUpdate(status); } @Override http://git-wip-us.apache.org/repos/asf/aurora/blob/4b8c34c0/src/test/java/org/apache/aurora/scheduler/TaskStatusHandlerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/TaskStatusHandlerImplTest.java b/src/test/java/org/apache/aurora/scheduler/TaskStatusHandlerImplTest.java new file mode 100644 index 0000000..9d1e251 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/TaskStatusHandlerImplTest.java @@ -0,0 +1,232 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import com.google.common.base.Optional; + +import com.twitter.common.testing.easymock.EasyMockTest; + +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.scheduler.mesos.Driver; +import org.apache.aurora.scheduler.state.StateChangeResult; +import org.apache.aurora.scheduler.state.StateManager; +import org.apache.aurora.scheduler.stats.CachedCounters; +import org.apache.aurora.scheduler.storage.Storage.StorageException; +import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.apache.mesos.Protos.TaskID; +import org.apache.mesos.Protos.TaskState; +import org.apache.mesos.Protos.TaskStatus; +import org.easymock.EasyMock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.aurora.gen.ScheduleStatus.FAILED; +import static org.apache.aurora.gen.ScheduleStatus.RUNNING; +import static org.apache.aurora.scheduler.TaskStatusHandlerImpl.statName; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TaskStatusHandlerImplTest extends EasyMockTest { + + private static final String TASK_ID_A = "task_id_a"; + + private StateManager stateManager; + private StorageTestUtil storageUtil; + private Driver driver; + private BlockingQueue queue; + private FakeStatsProvider stats; + + private TaskStatusHandlerImpl statusHandler; + + @Before + public void setUp() { + stateManager = createMock(StateManager.class); + storageUtil = new StorageTestUtil(this); + driver = createMock(Driver.class); + queue = new LinkedBlockingQueue<>(); + stats = new FakeStatsProvider(); + + statusHandler = new TaskStatusHandlerImpl( + storageUtil.storage, + stateManager, + driver, + queue, + 1000, + new CachedCounters(stats)); + + statusHandler.startAsync(); + } + + @After + public void after() { + statusHandler.stopAsync(); + } + + @Test + public void testForwardsStatusUpdates() throws Exception { + TaskStatus status = TaskStatus.newBuilder() + .setState(TaskState.TASK_RUNNING) + .setReason(TaskStatus.Reason.REASON_RECONCILIATION) + .setTaskId(TaskID.newBuilder().setValue(TASK_ID_A)) + .setMessage("fake message") + .build(); + + storageUtil.expectWrite(); + + expect(stateManager.changeState( + storageUtil.mutableStoreProvider, + TASK_ID_A, + Optional.absent(), + RUNNING, + Optional.of("fake message"))) + .andReturn(StateChangeResult.SUCCESS); + + final CountDownLatch latch = new CountDownLatch(1); + + driver.acknowledgeStatusUpdate(status); + expectLastCall().andAnswer(() -> { + latch.countDown(); + return null; + }); + + control.replay(); + + statusHandler.statusUpdate(status); + assertTrue(latch.await(5L, TimeUnit.SECONDS)); + assertEquals(1L, stats.getValue(statName(status, StateChangeResult.SUCCESS))); + } + + @Test + public void testFailedStatusUpdate() throws Exception { + storageUtil.expectWrite(); + + final CountDownLatch latch = new CountDownLatch(1); + + expect(stateManager.changeState( + storageUtil.mutableStoreProvider, + TASK_ID_A, + Optional.absent(), + RUNNING, + Optional.of("fake message"))) + .andAnswer(() -> { + latch.countDown(); + throw new StorageException("Injected error"); + }); + + control.replay(); + + TaskStatus status = TaskStatus.newBuilder() + .setState(TaskState.TASK_RUNNING) + .setTaskId(TaskID.newBuilder().setValue(TASK_ID_A)) + .setMessage("fake message") + .build(); + + statusHandler.statusUpdate(status); + + assertTrue(latch.await(5L, TimeUnit.SECONDS)); + } + + @Test + public void testMemoryLimitTranslation() throws Exception { + storageUtil.expectWrite(); + + TaskStatus status = TaskStatus.newBuilder() + .setState(TaskState.TASK_FAILED) + .setTaskId(TaskID.newBuilder().setValue(TASK_ID_A)) + .setReason(TaskStatus.Reason.REASON_MEMORY_LIMIT) + .setMessage("Some Message") + .build(); + + expect(stateManager.changeState( + storageUtil.mutableStoreProvider, + TASK_ID_A, + Optional.absent(), + FAILED, + Optional.of(TaskStatusHandlerImpl.MEMORY_LIMIT_DISPLAY))) + .andReturn(StateChangeResult.SUCCESS); + + final CountDownLatch latch = new CountDownLatch(1); + + driver.acknowledgeStatusUpdate(status); + expectLastCall().andAnswer(() -> { + latch.countDown(); + return null; + }); + + control.replay(); + + statusHandler.statusUpdate(status); + + assertTrue(latch.await(5L, TimeUnit.SECONDS)); + } + + @Test + public void testThreadFailure() throws Exception { + // Re-create the objects from @Before, since we need to inject a mock queue. + statusHandler.stopAsync(); + statusHandler.awaitTerminated(); + + stateManager = createMock(StateManager.class); + storageUtil = new StorageTestUtil(this); + driver = createMock(Driver.class); + queue = createMock(BlockingQueue.class); + + statusHandler = new TaskStatusHandlerImpl( + storageUtil.storage, + stateManager, + driver, + queue, + 1000, + new CachedCounters(stats)); + + expect(queue.add(EasyMock.anyObject())) + .andReturn(true); + + expect(queue.take()) + .andAnswer(() -> { + throw new RuntimeException(); + }); + + final CountDownLatch latch = new CountDownLatch(1); + + driver.abort(); + expectLastCall().andAnswer(() -> { + latch.countDown(); + return null; + }); + + control.replay(); + + statusHandler.startAsync(); + + TaskStatus status = TaskStatus.newBuilder() + .setState(TaskState.TASK_RUNNING) + .setTaskId(TaskID.newBuilder().setValue(TASK_ID_A)) + .setMessage("fake message") + .build(); + + statusHandler.statusUpdate(status); + + assertTrue(latch.await(5L, TimeUnit.SECONDS)); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/4b8c34c0/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java deleted file mode 100644 index 126001a..0000000 --- a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java +++ /dev/null @@ -1,265 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -import com.google.common.base.Optional; -import com.twitter.common.collections.Pair; -import com.twitter.common.testing.easymock.EasyMockTest; - -import org.apache.aurora.gen.HostAttributes; -import org.apache.aurora.gen.ScheduleStatus; -import org.apache.aurora.scheduler.async.OfferManager; -import org.apache.aurora.scheduler.mesos.Driver; -import org.apache.aurora.scheduler.mesos.Offers; -import org.apache.aurora.scheduler.state.StateChangeResult; -import org.apache.aurora.scheduler.state.StateManager; -import org.apache.aurora.scheduler.stats.CachedCounters; -import org.apache.aurora.scheduler.storage.Storage.StorageException; -import org.apache.aurora.scheduler.storage.entities.IHostAttributes; -import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; -import org.apache.aurora.scheduler.testing.FakeStatsProvider; -import org.apache.mesos.Protos.OfferID; -import org.apache.mesos.Protos.TaskID; -import org.apache.mesos.Protos.TaskState; -import org.apache.mesos.Protos.TaskStatus; -import org.easymock.EasyMock; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import static org.apache.aurora.gen.ScheduleStatus.FAILED; -import static org.apache.aurora.gen.ScheduleStatus.RUNNING; -import static org.apache.aurora.scheduler.UserTaskLauncher.statName; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class UserTaskLauncherTest extends EasyMockTest { - - private static final String TASK_ID_A = "task_id_a"; - - private static final OfferID OFFER_ID = OfferID.newBuilder().setValue("OfferId").build(); - private static final HostOffer OFFER = new HostOffer( - Offers.createOffer(4, 1024, 1024, Pair.of(80, 80)), - IHostAttributes.build(new HostAttributes())); - - private OfferManager offerManager; - private StateManager stateManager; - private StorageTestUtil storageUtil; - private Driver driver; - private BlockingQueue queue; - private FakeStatsProvider stats; - - private UserTaskLauncher launcher; - - @Before - public void setUp() { - offerManager = createMock(OfferManager.class); - stateManager = createMock(StateManager.class); - storageUtil = new StorageTestUtil(this); - driver = createMock(Driver.class); - queue = new LinkedBlockingQueue<>(); - stats = new FakeStatsProvider(); - - launcher = new UserTaskLauncher( - storageUtil.storage, - offerManager, - stateManager, - driver, - queue, - 1000, - new CachedCounters(stats)); - - launcher.startAsync(); - } - - @After - public void after() { - launcher.stopAsync(); - } - - @Test - public void testForwardsOffers() throws Exception { - offerManager.addOffer(OFFER); - - control.replay(); - - assertTrue(launcher.willUse(OFFER)); - } - - @Test - public void testForwardsStatusUpdates() throws Exception { - TaskStatus status = TaskStatus.newBuilder() - .setState(TaskState.TASK_RUNNING) - .setReason(TaskStatus.Reason.REASON_RECONCILIATION) - .setTaskId(TaskID.newBuilder().setValue(TASK_ID_A)) - .setMessage("fake message") - .build(); - - storageUtil.expectWrite(); - - expect(stateManager.changeState( - storageUtil.mutableStoreProvider, - TASK_ID_A, - Optional.absent(), - RUNNING, - Optional.of("fake message"))) - .andReturn(StateChangeResult.SUCCESS); - - final CountDownLatch latch = new CountDownLatch(1); - - driver.acknowledgeStatusUpdate(status); - expectLastCall().andAnswer(() -> { - latch.countDown(); - return null; - }); - - control.replay(); - - assertTrue(launcher.statusUpdate(status)); - assertTrue(latch.await(5L, TimeUnit.SECONDS)); - assertEquals(1L, stats.getValue(statName(status, StateChangeResult.SUCCESS))); - } - - @Test - public void testForwardsRescindedOffers() throws Exception { - launcher.cancelOffer(OFFER_ID); - - control.replay(); - - launcher.cancelOffer(OFFER_ID); - } - - @Test - public void testFailedStatusUpdate() throws Exception { - storageUtil.expectWrite(); - - final CountDownLatch latch = new CountDownLatch(1); - - expect(stateManager.changeState( - storageUtil.mutableStoreProvider, - TASK_ID_A, - Optional.absent(), - RUNNING, - Optional.of("fake message"))) - .andAnswer(() -> { - latch.countDown(); - throw new StorageException("Injected error"); - }); - - control.replay(); - - TaskStatus status = TaskStatus.newBuilder() - .setState(TaskState.TASK_RUNNING) - .setTaskId(TaskID.newBuilder().setValue(TASK_ID_A)) - .setMessage("fake message") - .build(); - - launcher.statusUpdate(status); - - assertTrue(latch.await(5L, TimeUnit.SECONDS)); - } - - @Test - public void testMemoryLimitTranslation() throws Exception { - storageUtil.expectWrite(); - - TaskStatus status = TaskStatus.newBuilder() - .setState(TaskState.TASK_FAILED) - .setTaskId(TaskID.newBuilder().setValue(TASK_ID_A)) - .setReason(TaskStatus.Reason.REASON_MEMORY_LIMIT) - .setMessage("Some Message") - .build(); - - expect(stateManager.changeState( - storageUtil.mutableStoreProvider, - TASK_ID_A, - Optional.absent(), - FAILED, - Optional.of(UserTaskLauncher.MEMORY_LIMIT_DISPLAY))) - .andReturn(StateChangeResult.SUCCESS); - - final CountDownLatch latch = new CountDownLatch(1); - - driver.acknowledgeStatusUpdate(status); - expectLastCall().andAnswer(() -> { - latch.countDown(); - return null; - }); - - control.replay(); - - launcher.statusUpdate(status); - - assertTrue(latch.await(5L, TimeUnit.SECONDS)); - } - - @Test - public void testThreadFailure() throws Exception { - // Re-create the objects from @Before, since we need to inject a mock queue. - launcher.stopAsync(); - launcher.awaitTerminated(); - - offerManager = createMock(OfferManager.class); - stateManager = createMock(StateManager.class); - storageUtil = new StorageTestUtil(this); - driver = createMock(Driver.class); - queue = createMock(BlockingQueue.class); - - launcher = new UserTaskLauncher( - storageUtil.storage, - offerManager, - stateManager, - driver, - queue, - 1000, - new CachedCounters(stats)); - - expect(queue.add(EasyMock.anyObject())) - .andReturn(true); - - expect(queue.take()) - .andAnswer(() -> { - throw new RuntimeException(); - }); - - final CountDownLatch latch = new CountDownLatch(1); - - driver.abort(); - expectLastCall().andAnswer(() -> { - latch.countDown(); - return null; - }); - - control.replay(); - - launcher.startAsync(); - - TaskStatus status = TaskStatus.newBuilder() - .setState(TaskState.TASK_RUNNING) - .setTaskId(TaskID.newBuilder().setValue(TASK_ID_A)) - .setMessage("fake message") - .build(); - - launcher.statusUpdate(status); - - assertTrue(latch.await(5L, TimeUnit.SECONDS)); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/4b8c34c0/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java index e4e1587..f08c799 100644 --- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java @@ -32,7 +32,8 @@ import org.apache.aurora.gen.Attribute; import org.apache.aurora.gen.HostAttributes; import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.scheduler.HostOffer; -import org.apache.aurora.scheduler.TaskLauncher; +import org.apache.aurora.scheduler.TaskStatusHandler; +import org.apache.aurora.scheduler.async.OfferManager; import org.apache.aurora.scheduler.base.Conversions; import org.apache.aurora.scheduler.base.SchedulerException; import org.apache.aurora.scheduler.events.EventSink; @@ -44,7 +45,6 @@ import org.apache.aurora.scheduler.storage.Storage.StorageException; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; import org.apache.aurora.scheduler.testing.FakeStatsProvider; -import org.apache.mesos.Protos; import org.apache.mesos.Protos.ExecutorID; import org.apache.mesos.Protos.FrameworkID; import org.apache.mesos.Protos.MasterInfo; @@ -65,6 +65,7 @@ import static org.apache.mesos.Protos.Offer; import static org.easymock.EasyMock.anyString; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; import static org.junit.Assert.assertTrue; public class MesosSchedulerImplTest extends EasyMockTest { @@ -140,8 +141,8 @@ public class MesosSchedulerImplTest extends EasyMockTest { private Logger log; private StorageTestUtil storageUtil; private Command shutdownCommand; - private TaskLauncher systemLauncher; - private TaskLauncher userLauncher; + private TaskStatusHandler statusHandler; + private OfferManager offerManager; private SchedulerDriver driver; private EventSink eventSink; @@ -159,14 +160,15 @@ public class MesosSchedulerImplTest extends EasyMockTest { shutdownCommand = createMock(Command.class); final Lifecycle lifecycle = new Lifecycle(shutdownCommand, createMock(UncaughtExceptionHandler.class)); - systemLauncher = createMock(TaskLauncher.class); - userLauncher = createMock(TaskLauncher.class); + statusHandler = createMock(TaskStatusHandler.class); + offerManager = createMock(OfferManager.class); eventSink = createMock(EventSink.class); scheduler = new MesosSchedulerImpl( storageUtil.storage, lifecycle, - ImmutableList.of(systemLauncher, userLauncher), + statusHandler, + offerManager, eventSink, MoreExecutors.sameThreadExecutor(), logger, @@ -193,48 +195,29 @@ public class MesosSchedulerImplTest extends EasyMockTest { } @Test - public void testNoAccepts() { + public void testAcceptOffer() { new OfferFixture() { @Override void respondToOffer() { expectOfferAttributesSaved(OFFER); - expect(systemLauncher.willUse(OFFER)).andReturn(false); - expect(userLauncher.willUse(OFFER)).andReturn(false); + offerManager.addOffer(OFFER); } }.run(); } @Test - public void testOfferFirstAccepts() { - new OfferFixture() { - @Override - void respondToOffer() { - expectOfferAttributesSaved(OFFER); - expect(systemLauncher.willUse(OFFER)).andReturn(true); - } - }.run(); - } - - @Test - public void testOfferFirstAcceptsFineLogging() { - log.setLevel(Level.FINE); - new OfferFixture() { - @Override - void respondToOffer() { - expectOfferAttributesSaved(OFFER); - expect(systemLauncher.willUse(OFFER)).andReturn(true); - } - }.run(); - } + public void testAcceptOfferFineLogging() { + Logger mockLogger = createMock(Logger.class); + mockLogger.info(anyString()); + expect(mockLogger.isLoggable(Level.FINE)).andReturn(true); + mockLogger.log(eq(Level.FINE), anyString()); + initializeScheduler(mockLogger); - @Test - public void testOfferSchedulerAccepts() { new OfferFixture() { @Override void respondToOffer() { expectOfferAttributesSaved(OFFER); - expect(systemLauncher.willUse(OFFER)).andReturn(false); - expect(userLauncher.willUse(OFFER)).andReturn(true); + offerManager.addOffer(OFFER); } }.run(); } @@ -253,67 +236,24 @@ public class MesosSchedulerImplTest extends EasyMockTest { expect(storageUtil.attributeStore.saveHostAttributes(saved)).andReturn(true); HostOffer offer = new HostOffer(OFFER.getOffer(), draining); - expect(systemLauncher.willUse(offer)).andReturn(false); - expect(userLauncher.willUse(offer)).andReturn(true); + offerManager.addOffer(offer); } }.run(); } @Test - public void testStatusUpdateNoAccepts() { - new StatusFixture() { - @Override - void expectations() { - eventSink.post(PUBSUB_EVENT); - expect(systemLauncher.statusUpdate(status)).andReturn(false); - expect(userLauncher.statusUpdate(status)).andReturn(false); - expect(driver.acknowledgeStatusUpdate(status)).andReturn(Protos.Status.DRIVER_RUNNING); - } - }.run(); - } - - private class FirstLauncherAccepts extends StatusFixture { - FirstLauncherAccepts(TaskStatus status) { - super(status); - } - - @Override - void expectations() { - eventSink.post(new TaskStatusReceived( - status.getState(), - Optional.fromNullable(status.getSource()), - Optional.fromNullable(status.getReason()), - Optional.of(1000000L) - )); - expect(systemLauncher.statusUpdate(status)).andReturn(true); - } - } - - @Test - public void testStatusUpdateFirstAccepts() { + public void testStatusUpdate() { // Test multiple variations of fields in TaskStatus to cover all branches. - new FirstLauncherAccepts(STATUS).run(); + new StatusUpdater(STATUS).run(); control.verify(); control.reset(); - new FirstLauncherAccepts(STATUS.toBuilder().clearSource().build()).run(); + new StatusUpdater(STATUS.toBuilder().clearSource().build()).run(); control.verify(); control.reset(); - new FirstLauncherAccepts(STATUS.toBuilder().clearReason().build()).run(); + new StatusUpdater(STATUS.toBuilder().clearReason().build()).run(); control.verify(); control.reset(); - new FirstLauncherAccepts(STATUS.toBuilder().clearMessage().build()).run(); - } - - @Test - public void testStatusUpdateSecondAccepts() { - new StatusFixture() { - @Override - void expectations() { - eventSink.post(PUBSUB_EVENT); - expect(systemLauncher.statusUpdate(status)).andReturn(false); - expect(userLauncher.statusUpdate(status)).andReturn(true); - } - }.run(); + new StatusUpdater(STATUS.toBuilder().clearMessage().build()).run(); } @Test(expected = SchedulerException.class) @@ -322,8 +262,8 @@ public class MesosSchedulerImplTest extends EasyMockTest { @Override void expectations() { eventSink.post(PUBSUB_EVENT); - expect(systemLauncher.statusUpdate(status)).andReturn(false); - expect(userLauncher.statusUpdate(status)).andThrow(new StorageException("Injected.")); + statusHandler.statusUpdate(status); + expectLastCall().andThrow(new StorageException("Injected.")); } }.run(); } @@ -335,10 +275,8 @@ public class MesosSchedulerImplTest extends EasyMockTest { void expectations() { expectOfferAttributesSaved(OFFER); expectOfferAttributesSaved(OFFER_2); - expect(systemLauncher.willUse(OFFER)).andReturn(false); - expect(userLauncher.willUse(OFFER)).andReturn(true); - expect(systemLauncher.willUse(OFFER_2)).andReturn(false); - expect(userLauncher.willUse(OFFER_2)).andReturn(false); + offerManager.addOffer(OFFER); + offerManager.addOffer(OFFER_2); } @Override @@ -390,8 +328,7 @@ public class MesosSchedulerImplTest extends EasyMockTest { @Test public void testOfferRescinded() { - systemLauncher.cancelOffer(OFFER_ID); - userLauncher.cancelOffer(OFFER_ID); + offerManager.cancelOffer(OFFER_ID); control.replay(); @@ -425,12 +362,28 @@ public class MesosSchedulerImplTest extends EasyMockTest { @Override void expectations() { eventSink.post(PUBSUB_RECONCILIATION_EVENT); - expect(systemLauncher.statusUpdate(status)).andReturn(false); - expect(userLauncher.statusUpdate(status)).andReturn(true); + statusHandler.statusUpdate(status); } }.run(); } + private class StatusUpdater extends StatusFixture { + StatusUpdater(TaskStatus status) { + super(status); + } + + @Override + void expectations() { + eventSink.post(new TaskStatusReceived( + status.getState(), + Optional.fromNullable(status.getSource()), + Optional.fromNullable(status.getReason()), + Optional.of(1000000L) + )); + statusHandler.statusUpdate(status); + } + } + private void expectOfferAttributesSaved(HostOffer offer) { expect(storageUtil.attributeStore.getHostAttributes(offer.getOffer().getHostname())) .andReturn(Optional.absent());