aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject aurora git commit: Refactoring TaskLauncher.
Date Tue, 23 Jun 2015 20:43:11 GMT
Repository: aurora
Updated Branches:
  refs/heads/master 68c46205e -> 4b8c34c0e


Refactoring TaskLauncher.

Bugs closed: AURORA-1334

Reviewed at https://reviews.apache.org/r/35761/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/4b8c34c0
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/4b8c34c0
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/4b8c34c0

Branch: refs/heads/master
Commit: 4b8c34c0e924b9fdd170eb7f05bd6b92580ce7b8
Parents: 68c4620
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Tue Jun 23 13:42:54 2015 -0700
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Tue Jun 23 13:42:54 2015 -0700

----------------------------------------------------------------------
 .../aurora/benchmark/StatusUpdateBenchmark.java |  29 +-
 .../aurora/scheduler/SchedulerModule.java       |  24 +-
 .../apache/aurora/scheduler/TaskLauncher.java   |  54 ----
 .../aurora/scheduler/TaskStatusHandler.java     |  28 ++
 .../aurora/scheduler/TaskStatusHandlerImpl.java | 194 ++++++++++++++
 .../aurora/scheduler/UserTaskLauncher.java      | 212 ---------------
 .../aurora/scheduler/async/AsyncModule.java     |   4 +-
 .../scheduler/mesos/MesosSchedulerImpl.java     |  45 +---
 .../scheduler/TaskStatusHandlerImplTest.java    | 232 ++++++++++++++++
 .../aurora/scheduler/UserTaskLauncherTest.java  | 265 -------------------
 .../scheduler/mesos/MesosSchedulerImplTest.java | 141 ++++------
 11 files changed, 538 insertions(+), 690 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/4b8c34c0/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
index 4c63cc7..308bbd9 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
@@ -13,7 +13,6 @@
  */
 package org.apache.aurora.benchmark;
 
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
@@ -25,14 +24,13 @@ import java.util.logging.Logger;
 import javax.inject.Singleton;
 
 import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
 import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
-import com.google.inject.Provides;
 import com.google.inject.TypeLiteral;
 
 import com.twitter.common.application.ShutdownStage;
@@ -50,8 +48,8 @@ import org.apache.aurora.benchmark.fakes.FakeSchedulerDriver;
 import org.apache.aurora.benchmark.fakes.FakeStatsProvider;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.TaskIdGenerator;
-import org.apache.aurora.scheduler.TaskLauncher;
-import org.apache.aurora.scheduler.UserTaskLauncher;
+import org.apache.aurora.scheduler.TaskStatusHandler;
+import org.apache.aurora.scheduler.TaskStatusHandlerImpl;
 import org.apache.aurora.scheduler.async.OfferManager;
 import org.apache.aurora.scheduler.async.RescheduleCalculator;
 import org.apache.aurora.scheduler.async.preemptor.ClusterStateImpl;
@@ -164,7 +162,7 @@ public class StatusUpdateBenchmark {
   private long latencyMilliseconds;
 
   private Scheduler scheduler;
-  private UserTaskLauncher userTaskLauncher;
+  private AbstractExecutionThreadService statusHandler;
   private SlowStorageWrapper storage;
   private EventBus eventBus;
   private Set<IScheduledTask> tasks;
@@ -243,18 +241,13 @@ public class StatusUpdateBenchmark {
               }
             });
             bind(new TypeLiteral<BlockingQueue<Protos.TaskStatus>>() { })
-                .annotatedWith(UserTaskLauncher.StatusUpdateQueue.class)
+                .annotatedWith(TaskStatusHandlerImpl.StatusUpdateQueue.class)
                 .toInstance(new LinkedBlockingQueue<Protos.TaskStatus>());
             bind(new TypeLiteral<Integer>() { })
-                .annotatedWith(UserTaskLauncher.MaxBatchSize.class)
+                .annotatedWith(TaskStatusHandlerImpl.MaxBatchSize.class)
                 .toInstance(1000);
-            bind(UserTaskLauncher.class).in(Singleton.class);
-          }
-
-          @Provides
-          @Singleton
-          List<TaskLauncher> provideTaskLaunchers(UserTaskLauncher launcher) {
-            return ImmutableList.<TaskLauncher>of(launcher);
+            bind(TaskStatusHandler.class).to(TaskStatusHandlerImpl.class);
+            bind(TaskStatusHandlerImpl.class).in(Singleton.class);
           }
         }
     );
@@ -263,13 +256,13 @@ public class StatusUpdateBenchmark {
     scheduler = injector.getInstance(Scheduler.class);
     eventBus.register(this);
 
-    userTaskLauncher = injector.getInstance(UserTaskLauncher.class);
-    userTaskLauncher.startAsync();
+    statusHandler = injector.getInstance(TaskStatusHandlerImpl.class);
+    statusHandler.startAsync();
   }
 
   @TearDown(Level.Trial)
   public void tearDown() {
-    userTaskLauncher.stopAsync();
+    statusHandler.stopAsync();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/aurora/blob/4b8c34c0/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
index ae31bdb..26093ef 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
@@ -13,7 +13,6 @@
  */
 package org.apache.aurora.scheduler;
 
-import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
@@ -21,10 +20,8 @@ import java.util.logging.Logger;
 
 import javax.inject.Singleton;
 
-import com.google.common.collect.ImmutableList;
 import com.google.inject.AbstractModule;
 import com.google.inject.PrivateModule;
-import com.google.inject.Provides;
 import com.google.inject.TypeLiteral;
 
 import com.twitter.common.args.Arg;
@@ -39,6 +36,8 @@ import org.apache.aurora.scheduler.base.AsyncUtil;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.mesos.Protos;
 
+import static org.apache.aurora.scheduler.SchedulerServicesModule.addSchedulerActiveServiceBinding;
+
 /**
  * Binding module for top-level scheduling logic.
  */
@@ -83,22 +82,17 @@ public class SchedulerModule extends AbstractModule {
     PubsubEventModule.bindSubscriber(binder(), SchedulerLifecycle.class);
     bind(TaskVars.class).in(Singleton.class);
     PubsubEventModule.bindSubscriber(binder(), TaskVars.class);
-    SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(TaskVars.class);
+    addSchedulerActiveServiceBinding(binder()).to(TaskVars.class);
 
     bind(new TypeLiteral<BlockingQueue<Protos.TaskStatus>>() { })
-        .annotatedWith(UserTaskLauncher.StatusUpdateQueue.class)
-        .toInstance(new LinkedBlockingQueue<Protos.TaskStatus>());
+        .annotatedWith(TaskStatusHandlerImpl.StatusUpdateQueue.class)
+        .toInstance(new LinkedBlockingQueue<>());
     bind(new TypeLiteral<Integer>() { })
-        .annotatedWith(UserTaskLauncher.MaxBatchSize.class)
+        .annotatedWith(TaskStatusHandlerImpl.MaxBatchSize.class)
         .toInstance(MAX_STATUS_UPDATE_BATCH_SIZE.get());
 
-    bind(UserTaskLauncher.class).in(Singleton.class);
-    SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(UserTaskLauncher.class);
-  }
-
-  @Provides
-  @Singleton
-  List<TaskLauncher> provideTaskLaunchers(UserTaskLauncher userTaskLauncher) {
-    return ImmutableList.of(userTaskLauncher);
+    bind(TaskStatusHandler.class).to(TaskStatusHandlerImpl.class);
+    bind(TaskStatusHandlerImpl.class).in(Singleton.class);
+    addSchedulerActiveServiceBinding(binder()).to(TaskStatusHandlerImpl.class);
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4b8c34c0/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java b/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java
deleted file mode 100644
index cd55a6e..0000000
--- a/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler;
-
-import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.TaskStatus;
-
-/**
- * A receiver of resource offers and task status updates.
- */
-public interface TaskLauncher {
-
-  /**
-   * Presents a resource offer to the task launcher, which will be passed to any subsequent task
-   * launchers if this one does not accept.
-   * <p>
-   * A task launcher may choose to retain an offer for later use.  Any retained offers must be
-   * cleaned up with {@link #cancelOffer(OfferID)}.
-   *
-   * @param offer The resource offer.
-   * @return {@code false} if the launcher will not act on the offer, or {@code true} if the
-   *         launcher may accept the offer at some point in the future.
-   */
-  boolean willUse(HostOffer offer);
-
-  /**
-   * Informs the launcher that a status update has been received for a task.  If the task is not
-   * associated with the launcher, it should return {@code false} so that another launcher may
-   * receive it.
-   *
-   * @param status The status update.
-   * @return {@code true} if the status is relevant to the launcher and should not be delivered to
-   * other launchers, {@code false} otherwise.
-   */
-  boolean statusUpdate(TaskStatus status);
-
-  /**
-   * Informs the launcher that a previously-advertised offer is canceled and may not be used.
-   *
-   * @param offer The canceled offer.
-   */
-  void cancelOffer(OfferID offer);
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/4b8c34c0/src/main/java/org/apache/aurora/scheduler/TaskStatusHandler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TaskStatusHandler.java b/src/main/java/org/apache/aurora/scheduler/TaskStatusHandler.java
new file mode 100644
index 0000000..3e132ee
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/TaskStatusHandler.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler;
+
+import org.apache.mesos.Protos.TaskStatus;
+
+/**
+ * A handler of task status updates.
+ */
+public interface TaskStatusHandler {
+  /**
+   * Informs the status handler that a status update has been received for a task.
+   *
+   * @param status The status update.
+   */
+  void statusUpdate(TaskStatus status);
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/4b8c34c0/src/main/java/org/apache/aurora/scheduler/TaskStatusHandlerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TaskStatusHandlerImpl.java b/src/main/java/org/apache/aurora/scheduler/TaskStatusHandlerImpl.java
new file mode 100644
index 0000000..538cb75
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/TaskStatusHandlerImpl.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+import javax.inject.Qualifier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import com.twitter.common.stats.Stats;
+
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.base.Conversions;
+import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.aurora.scheduler.state.StateChangeResult;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.stats.CachedCounters;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.mesos.Protos.TaskStatus;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A {@link TaskStatusHandler} implementation.
+ */
+@VisibleForTesting
+public class TaskStatusHandlerImpl extends AbstractExecutionThreadService
+    implements TaskStatusHandler {
+
+  private static final Logger LOG = Logger.getLogger(TaskStatusHandlerImpl.class.getName());
+
+  @VisibleForTesting
+  static final String MEMORY_LIMIT_DISPLAY = "Task used more memory than requested.";
+
+  private static final String STATUS_STAT_FORMAT = "status_update_%s_%s";
+
+  private final Storage storage;
+  private final StateManager stateManager;
+  private final Driver driver;
+  private final BlockingQueue<TaskStatus> pendingUpdates;
+  private final int maxBatchSize;
+  private final CachedCounters counters;
+
+  private final AtomicReference<Thread> threadReference = new AtomicReference<>();
+
+  /**
+   * Binding annotation for the status update queue.
+   */
+  @VisibleForTesting
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  public @interface StatusUpdateQueue { }
+
+  /**
+   * Binding annotation maximum size of a status update batch.
+   */
+  @VisibleForTesting
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  public @interface MaxBatchSize { }
+
+  @Inject
+  TaskStatusHandlerImpl(
+      Storage storage,
+      StateManager stateManager,
+      final Driver driver,
+      @StatusUpdateQueue BlockingQueue<TaskStatus> pendingUpdates,
+      @MaxBatchSize Integer maxBatchSize,
+      CachedCounters counters) {
+
+    this.storage = requireNonNull(storage);
+    this.stateManager = requireNonNull(stateManager);
+    this.driver = requireNonNull(driver);
+    this.pendingUpdates = requireNonNull(pendingUpdates);
+    this.maxBatchSize = requireNonNull(maxBatchSize);
+    this.counters = requireNonNull(counters);
+
+    Stats.exportSize("status_updates_queue_size", this.pendingUpdates);
+
+    addListener(
+        new Listener() {
+          @Override
+          public void failed(State from, Throwable failure) {
+            LOG.log(Level.SEVERE, "TaskStatusHandler failed: ", failure);
+            driver.abort();
+          }
+        },
+        MoreExecutors.sameThreadExecutor());
+  }
+
+  @Override
+  public void statusUpdate(TaskStatus status) {
+    pendingUpdates.add(status);
+  }
+
+  @Override
+  protected void triggerShutdown() {
+    Thread thread = threadReference.get();
+
+    if (thread != null) {
+      thread.interrupt();
+    }
+  }
+
+  @Override
+  protected void run() {
+    threadReference.set(Thread.currentThread());
+
+    while (isRunning()) {
+      final Queue<TaskStatus> updates = new ArrayDeque<>();
+
+      try {
+        updates.add(pendingUpdates.take());
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        break;
+      }
+
+      // Process all other available updates, up to the limit on batch size.
+      // TODO(bmahler): Expose histogram metrics of the batch sizes.
+      pendingUpdates.drainTo(updates, maxBatchSize - updates.size());
+
+      try {
+        storage.write(new Storage.MutateWork.NoResult.Quiet() {
+          @Override
+          protected void execute(Storage.MutableStoreProvider storeProvider) {
+            for (TaskStatus status : updates) {
+              ScheduleStatus translatedState = Conversions.convertProtoState(status.getState());
+
+              Optional<String> message = Optional.absent();
+              if (status.hasMessage()) {
+                message = Optional.of(status.getMessage());
+              }
+
+              if (translatedState == ScheduleStatus.FAILED && status.hasReason()
+                  && status.getReason() == TaskStatus.Reason.REASON_MEMORY_LIMIT) {
+                message = Optional.of(MEMORY_LIMIT_DISPLAY);
+              }
+
+              StateChangeResult result = stateManager.changeState(
+                  storeProvider,
+                  status.getTaskId().getValue(),
+                  Optional.<ScheduleStatus>absent(),
+                  translatedState,
+                  message);
+
+              if (status.hasReason()) {
+                counters.get(statName(status, result)).incrementAndGet();
+              }
+            }
+          }
+        });
+
+        for (TaskStatus status : updates) {
+          driver.acknowledgeStatusUpdate(status);
+        }
+      } catch (RuntimeException e) {
+        LOG.log(Level.SEVERE, "Failed to process status update batch " + updates, e);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  static String statName(TaskStatus status, StateChangeResult result) {
+    return String.format(STATUS_STAT_FORMAT, status.getReason(), result);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/4b8c34c0/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
deleted file mode 100644
index 6bfbf0c..0000000
--- a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.util.ArrayDeque;
-import java.util.Queue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-import javax.inject.Qualifier;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.AbstractExecutionThreadService;
-import com.google.common.util.concurrent.MoreExecutors;
-
-import com.twitter.common.stats.Stats;
-
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.scheduler.async.OfferManager;
-import org.apache.aurora.scheduler.base.Conversions;
-import org.apache.aurora.scheduler.mesos.Driver;
-import org.apache.aurora.scheduler.state.StateChangeResult;
-import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.stats.CachedCounters;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.TaskStatus;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-import static java.util.Objects.requireNonNull;
-
-/**
- * A task launcher that matches resource offers against user tasks.
- */
-@VisibleForTesting
-public class UserTaskLauncher extends AbstractExecutionThreadService implements TaskLauncher {
-
-  private static final Logger LOG = Logger.getLogger(UserTaskLauncher.class.getName());
-
-  @VisibleForTesting
-  static final String MEMORY_LIMIT_DISPLAY = "Task used more memory than requested.";
-
-  private static final String STATUS_STAT_FORMAT = "status_update_%s_%s";
-
-  private final Storage storage;
-  private final OfferManager offerManager;
-  private final StateManager stateManager;
-  private final Driver driver;
-  private final BlockingQueue<TaskStatus> pendingUpdates;
-  private final int maxBatchSize;
-  private final CachedCounters counters;
-
-  private final AtomicReference<Thread> threadReference = new AtomicReference<>();
-
-  /**
-   * Binding annotation for the status update queue.
-   */
-  @VisibleForTesting
-  @Qualifier
-  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-  public @interface StatusUpdateQueue { }
-
-  /**
-   * Binding annotation maximum size of a status update batch.
-   */
-  @VisibleForTesting
-  @Qualifier
-  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-  public @interface MaxBatchSize { }
-
-  @Inject
-  UserTaskLauncher(
-      Storage storage,
-      OfferManager offerManager,
-      StateManager stateManager,
-      final Driver driver,
-      @StatusUpdateQueue BlockingQueue<TaskStatus> pendingUpdates,
-      @MaxBatchSize Integer maxBatchSize,
-      CachedCounters counters) {
-
-    this.storage = requireNonNull(storage);
-    this.offerManager = requireNonNull(offerManager);
-    this.stateManager = requireNonNull(stateManager);
-    this.driver = requireNonNull(driver);
-    this.pendingUpdates = requireNonNull(pendingUpdates);
-    this.maxBatchSize = requireNonNull(maxBatchSize);
-    this.counters = requireNonNull(counters);
-
-    Stats.exportSize("status_updates_queue_size", this.pendingUpdates);
-
-    addListener(
-        new Listener() {
-          @Override
-          public void failed(State from, Throwable failure) {
-            LOG.log(Level.SEVERE, "UserTaskLauncher failed: ", failure);
-            driver.abort();
-          }
-        },
-        MoreExecutors.sameThreadExecutor());
-  }
-
-  @Override
-  public boolean willUse(HostOffer offer) {
-    requireNonNull(offer);
-
-    offerManager.addOffer(offer);
-    return true;
-  }
-
-  @Override
-  public boolean statusUpdate(TaskStatus status) {
-    pendingUpdates.add(status);
-    return true;
-  }
-
-  @Override
-  public void cancelOffer(OfferID offer) {
-    offerManager.cancelOffer(offer);
-  }
-
-  @Override
-  protected void triggerShutdown() {
-    Thread thread = threadReference.get();
-
-    if (thread != null) {
-      thread.interrupt();
-    }
-  }
-
-  @Override
-  protected void run() {
-    threadReference.set(Thread.currentThread());
-
-    while (isRunning()) {
-      final Queue<TaskStatus> updates = new ArrayDeque<>();
-
-      try {
-        updates.add(pendingUpdates.take());
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        break;
-      }
-
-      // Process all other available updates, up to the limit on batch size.
-      // TODO(bmahler): Expose histogram metrics of the batch sizes.
-      pendingUpdates.drainTo(updates, maxBatchSize - updates.size());
-
-      try {
-        storage.write(new Storage.MutateWork.NoResult.Quiet() {
-          @Override
-          protected void execute(Storage.MutableStoreProvider storeProvider) {
-            for (TaskStatus status : updates) {
-              ScheduleStatus translatedState = Conversions.convertProtoState(status.getState());
-
-              Optional<String> message = Optional.absent();
-              if (status.hasMessage()) {
-                message = Optional.of(status.getMessage());
-              }
-
-              if (translatedState == ScheduleStatus.FAILED && status.hasReason()
-                  && status.getReason() == TaskStatus.Reason.REASON_MEMORY_LIMIT) {
-                message = Optional.of(MEMORY_LIMIT_DISPLAY);
-              }
-
-              StateChangeResult result = stateManager.changeState(
-                  storeProvider,
-                  status.getTaskId().getValue(),
-                  Optional.<ScheduleStatus>absent(),
-                  translatedState,
-                  message);
-
-              if (status.hasReason()) {
-                counters.get(statName(status, result)).incrementAndGet();
-              }
-            }
-          }
-        });
-
-        for (TaskStatus status : updates) {
-          driver.acknowledgeStatusUpdate(status);
-        }
-      } catch (RuntimeException e) {
-        LOG.log(Level.SEVERE, "Failed to process status update batch " + updates, e);
-      }
-    }
-  }
-
-  @VisibleForTesting
-  static String statName(TaskStatus status, StateChangeResult result) {
-    return String.format(STATUS_STAT_FORMAT, status.getReason(), result);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/4b8c34c0/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
index 17b3585..8c2d751 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
@@ -170,10 +170,12 @@ public class AsyncModule extends AbstractModule {
   private static final Arg<Amount<Long, Time>> RESERVATION_DURATION =
       Arg.create(Amount.of(3L, Time.MINUTES));
 
+  // Reconciliation may create a big surge of status updates in a large cluster. Setting the default
+  // initial delay to 1 minute to ease up storage contention during scheduler start up.
   @CmdLine(name = "reconciliation_initial_delay",
       help = "Initial amount of time to delay task reconciliation after scheduler start up.")
   private static final Arg<Amount<Long, Time>> RECONCILIATION_INITIAL_DELAY =
-      Arg.create(Amount.of(0L, Time.MINUTES));
+      Arg.create(Amount.of(1L, Time.MINUTES));
 
   @Positive
   @CmdLine(name = "reconciliation_explicit_interval",

http://git-wip-us.apache.org/repos/asf/aurora/blob/4b8c34c0/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
index 4f7a1be..33749de 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
@@ -32,7 +32,8 @@ import com.twitter.common.inject.TimedInterceptor.Timed;
 
 import org.apache.aurora.GuiceUtils.AllowUnchecked;
 import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.TaskLauncher;
+import org.apache.aurora.scheduler.TaskStatusHandler;
+import org.apache.aurora.scheduler.async.OfferManager;
 import org.apache.aurora.scheduler.base.SchedulerException;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
@@ -67,8 +68,8 @@ import static org.apache.mesos.Protos.TaskStatus.Reason.REASON_RECONCILIATION;
  */
 @VisibleForTesting
 public class MesosSchedulerImpl implements Scheduler {
-  private final List<TaskLauncher> taskLaunchers;
-
+  private final TaskStatusHandler taskStatusHandler;
+  private final OfferManager offerManager;
   private final Storage storage;
   private final Lifecycle lifecycle;
   private final EventSink eventSink;
@@ -90,11 +91,8 @@ public class MesosSchedulerImpl implements Scheduler {
    *
    * @param storage Store to save host attributes into.
    * @param lifecycle Application lifecycle manager.
-   * @param taskLaunchers Task launchers, which will be used in order.  Calls to
-   *                      {@link TaskLauncher#willUse(HostOffer)} and
-   *                      {@link TaskLauncher#statusUpdate(TaskStatus)} are propagated to provided
-   *                      launchers, ceasing after the first match (based on a return value of
-   *                      {@code true}.
+   * @param taskStatusHandler Task status update manager.
+   * @param offerManager Offer manager.
    * @param eventSink Pubsub sink to send driver status changes to.
    * @param executor Executor for async work
    */
@@ -102,7 +100,8 @@ public class MesosSchedulerImpl implements Scheduler {
   public MesosSchedulerImpl(
       Storage storage,
       final Lifecycle lifecycle,
-      List<TaskLauncher> taskLaunchers,
+      TaskStatusHandler taskStatusHandler,
+      OfferManager offerManager,
       EventSink eventSink,
       @SchedulerExecutor Executor executor,
       Logger log,
@@ -110,7 +109,8 @@ public class MesosSchedulerImpl implements Scheduler {
 
     this.storage = requireNonNull(storage);
     this.lifecycle = requireNonNull(lifecycle);
-    this.taskLaunchers = requireNonNull(taskLaunchers);
+    this.taskStatusHandler = requireNonNull(taskStatusHandler);
+    this.offerManager = requireNonNull(offerManager);
     this.eventSink = requireNonNull(eventSink);
     this.executor = requireNonNull(executor);
     this.log = requireNonNull(log);
@@ -174,11 +174,7 @@ public class MesosSchedulerImpl implements Scheduler {
                 log.log(Level.FINE, String.format("Received offer: %s", offer));
               }
               counters.get("scheduler_resource_offers").incrementAndGet();
-              for (TaskLauncher launcher : taskLaunchers) {
-                if (launcher.willUse(new HostOffer(offer, attributes))) {
-                  break;
-                }
-              }
+              offerManager.addOffer(new HostOffer(offer, attributes));
             }
           }
         });
@@ -189,9 +185,7 @@ public class MesosSchedulerImpl implements Scheduler {
   @Override
   public void offerRescinded(SchedulerDriver schedulerDriver, OfferID offerId) {
     log.info("Offer rescinded: " + offerId);
-    for (TaskLauncher launcher : taskLaunchers) {
-      launcher.cancelOffer(offerId);
-    }
+    offerManager.cancelOffer(offerId);
   }
 
   private static void logStatusUpdate(Logger logger, TaskStatus status) {
@@ -236,24 +230,13 @@ public class MesosSchedulerImpl implements Scheduler {
         Optional.fromNullable(status.getTimestamp()).transform(SECONDS_TO_MICROS)));
 
     try {
-      for (TaskLauncher launcher : taskLaunchers) {
-        if (launcher.statusUpdate(status)) {
-          // The launcher is responsible for acknowledging the update.
-          return;
-        }
-      }
+      // The status handler is responsible for acknowledging the update.
+      taskStatusHandler.statusUpdate(status);
     } catch (SchedulerException e) {
       log.log(Level.SEVERE, "Status update failed due to scheduler exception: " + e, e);
       // We re-throw the exception here to trigger an abort of the driver.
       throw e;
     }
-
-    log.warning("Unhandled status update " + status);
-    counters.get("scheduler_status_updates").incrementAndGet();
-
-    // Even though we have not handled this update, we acknowledge it to prevent an unbounded
-    // growth of status update retries from Mesos.
-    driver.acknowledgeStatusUpdate(status);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/aurora/blob/4b8c34c0/src/test/java/org/apache/aurora/scheduler/TaskStatusHandlerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/TaskStatusHandlerImplTest.java b/src/test/java/org/apache/aurora/scheduler/TaskStatusHandlerImplTest.java
new file mode 100644
index 0000000..9d1e251
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/TaskStatusHandlerImplTest.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Optional;
+
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.aurora.scheduler.state.StateChangeResult;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.stats.CachedCounters;
+import org.apache.aurora.scheduler.storage.Storage.StorageException;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskState;
+import org.apache.mesos.Protos.TaskStatus;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.FAILED;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.scheduler.TaskStatusHandlerImpl.statName;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TaskStatusHandlerImplTest extends EasyMockTest {
+
+  private static final String TASK_ID_A = "task_id_a";
+
+  private StateManager stateManager;
+  private StorageTestUtil storageUtil;
+  private Driver driver;
+  private BlockingQueue<TaskStatus> queue;
+  private FakeStatsProvider stats;
+
+  private TaskStatusHandlerImpl statusHandler;
+
+  @Before
+  public void setUp() {
+    stateManager = createMock(StateManager.class);
+    storageUtil = new StorageTestUtil(this);
+    driver = createMock(Driver.class);
+    queue = new LinkedBlockingQueue<>();
+    stats = new FakeStatsProvider();
+
+    statusHandler = new TaskStatusHandlerImpl(
+        storageUtil.storage,
+        stateManager,
+        driver,
+        queue,
+        1000,
+        new CachedCounters(stats));
+
+    statusHandler.startAsync();
+  }
+
+  @After
+  public void after() {
+    statusHandler.stopAsync();
+  }
+
+  @Test
+  public void testForwardsStatusUpdates() throws Exception {
+    TaskStatus status = TaskStatus.newBuilder()
+        .setState(TaskState.TASK_RUNNING)
+        .setReason(TaskStatus.Reason.REASON_RECONCILIATION)
+        .setTaskId(TaskID.newBuilder().setValue(TASK_ID_A))
+        .setMessage("fake message")
+        .build();
+
+    storageUtil.expectWrite();
+
+    expect(stateManager.changeState(
+        storageUtil.mutableStoreProvider,
+        TASK_ID_A,
+        Optional.<ScheduleStatus>absent(),
+        RUNNING,
+        Optional.of("fake message")))
+        .andReturn(StateChangeResult.SUCCESS);
+
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    driver.acknowledgeStatusUpdate(status);
+    expectLastCall().andAnswer(() -> {
+        latch.countDown();
+        return null;
+      });
+
+    control.replay();
+
+    statusHandler.statusUpdate(status);
+    assertTrue(latch.await(5L, TimeUnit.SECONDS));
+    assertEquals(1L, stats.getValue(statName(status, StateChangeResult.SUCCESS)));
+  }
+
+  @Test
+  public void testFailedStatusUpdate() throws Exception {
+    storageUtil.expectWrite();
+
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    expect(stateManager.changeState(
+        storageUtil.mutableStoreProvider,
+        TASK_ID_A,
+        Optional.<ScheduleStatus>absent(),
+        RUNNING,
+        Optional.of("fake message")))
+        .andAnswer(() -> {
+            latch.countDown();
+            throw new StorageException("Injected error");
+          });
+
+    control.replay();
+
+    TaskStatus status = TaskStatus.newBuilder()
+        .setState(TaskState.TASK_RUNNING)
+        .setTaskId(TaskID.newBuilder().setValue(TASK_ID_A))
+        .setMessage("fake message")
+        .build();
+
+    statusHandler.statusUpdate(status);
+
+    assertTrue(latch.await(5L, TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void testMemoryLimitTranslation() throws Exception {
+    storageUtil.expectWrite();
+
+    TaskStatus status = TaskStatus.newBuilder()
+        .setState(TaskState.TASK_FAILED)
+        .setTaskId(TaskID.newBuilder().setValue(TASK_ID_A))
+        .setReason(TaskStatus.Reason.REASON_MEMORY_LIMIT)
+        .setMessage("Some Message")
+        .build();
+
+    expect(stateManager.changeState(
+        storageUtil.mutableStoreProvider,
+        TASK_ID_A,
+        Optional.absent(),
+        FAILED,
+        Optional.of(TaskStatusHandlerImpl.MEMORY_LIMIT_DISPLAY)))
+        .andReturn(StateChangeResult.SUCCESS);
+
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    driver.acknowledgeStatusUpdate(status);
+    expectLastCall().andAnswer(() -> {
+        latch.countDown();
+        return null;
+      });
+
+    control.replay();
+
+    statusHandler.statusUpdate(status);
+
+    assertTrue(latch.await(5L, TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void testThreadFailure() throws Exception {
+    // Re-create the objects from @Before, since we need to inject a mock queue.
+    statusHandler.stopAsync();
+    statusHandler.awaitTerminated();
+
+    stateManager = createMock(StateManager.class);
+    storageUtil = new StorageTestUtil(this);
+    driver = createMock(Driver.class);
+    queue = createMock(BlockingQueue.class);
+
+    statusHandler = new TaskStatusHandlerImpl(
+        storageUtil.storage,
+        stateManager,
+        driver,
+        queue,
+        1000,
+        new CachedCounters(stats));
+
+    expect(queue.add(EasyMock.<TaskStatus>anyObject()))
+        .andReturn(true);
+
+    expect(queue.take())
+        .andAnswer(() -> {
+            throw new RuntimeException();
+          });
+
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    driver.abort();
+    expectLastCall().andAnswer(() -> {
+        latch.countDown();
+        return null;
+      });
+
+    control.replay();
+
+    statusHandler.startAsync();
+
+    TaskStatus status = TaskStatus.newBuilder()
+        .setState(TaskState.TASK_RUNNING)
+        .setTaskId(TaskID.newBuilder().setValue(TASK_ID_A))
+        .setMessage("fake message")
+        .build();
+
+    statusHandler.statusUpdate(status);
+
+    assertTrue(latch.await(5L, TimeUnit.SECONDS));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/4b8c34c0/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
deleted file mode 100644
index 126001a..0000000
--- a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Optional;
-import com.twitter.common.collections.Pair;
-import com.twitter.common.testing.easymock.EasyMockTest;
-
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.scheduler.async.OfferManager;
-import org.apache.aurora.scheduler.mesos.Driver;
-import org.apache.aurora.scheduler.mesos.Offers;
-import org.apache.aurora.scheduler.state.StateChangeResult;
-import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.stats.CachedCounters;
-import org.apache.aurora.scheduler.storage.Storage.StorageException;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.apache.aurora.scheduler.testing.FakeStatsProvider;
-import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.TaskID;
-import org.apache.mesos.Protos.TaskState;
-import org.apache.mesos.Protos.TaskStatus;
-import org.easymock.EasyMock;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.ScheduleStatus.FAILED;
-import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
-import static org.apache.aurora.scheduler.UserTaskLauncher.statName;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class UserTaskLauncherTest extends EasyMockTest {
-
-  private static final String TASK_ID_A = "task_id_a";
-
-  private static final OfferID OFFER_ID = OfferID.newBuilder().setValue("OfferId").build();
-  private static final HostOffer OFFER = new HostOffer(
-      Offers.createOffer(4, 1024, 1024, Pair.of(80, 80)),
-      IHostAttributes.build(new HostAttributes()));
-
-  private OfferManager offerManager;
-  private StateManager stateManager;
-  private StorageTestUtil storageUtil;
-  private Driver driver;
-  private BlockingQueue<TaskStatus> queue;
-  private FakeStatsProvider stats;
-
-  private UserTaskLauncher launcher;
-
-  @Before
-  public void setUp() {
-    offerManager = createMock(OfferManager.class);
-    stateManager = createMock(StateManager.class);
-    storageUtil = new StorageTestUtil(this);
-    driver = createMock(Driver.class);
-    queue = new LinkedBlockingQueue<>();
-    stats = new FakeStatsProvider();
-
-    launcher = new UserTaskLauncher(
-        storageUtil.storage,
-        offerManager,
-        stateManager,
-        driver,
-        queue,
-        1000,
-        new CachedCounters(stats));
-
-    launcher.startAsync();
-  }
-
-  @After
-  public void after() {
-    launcher.stopAsync();
-  }
-
-  @Test
-  public void testForwardsOffers() throws Exception {
-    offerManager.addOffer(OFFER);
-
-    control.replay();
-
-    assertTrue(launcher.willUse(OFFER));
-  }
-
-  @Test
-  public void testForwardsStatusUpdates() throws Exception {
-    TaskStatus status = TaskStatus.newBuilder()
-        .setState(TaskState.TASK_RUNNING)
-        .setReason(TaskStatus.Reason.REASON_RECONCILIATION)
-        .setTaskId(TaskID.newBuilder().setValue(TASK_ID_A))
-        .setMessage("fake message")
-        .build();
-
-    storageUtil.expectWrite();
-
-    expect(stateManager.changeState(
-        storageUtil.mutableStoreProvider,
-        TASK_ID_A,
-        Optional.<ScheduleStatus>absent(),
-        RUNNING,
-        Optional.of("fake message")))
-        .andReturn(StateChangeResult.SUCCESS);
-
-    final CountDownLatch latch = new CountDownLatch(1);
-
-    driver.acknowledgeStatusUpdate(status);
-    expectLastCall().andAnswer(() -> {
-        latch.countDown();
-        return null;
-      });
-
-    control.replay();
-
-    assertTrue(launcher.statusUpdate(status));
-    assertTrue(latch.await(5L, TimeUnit.SECONDS));
-    assertEquals(1L, stats.getValue(statName(status, StateChangeResult.SUCCESS)));
-  }
-
-  @Test
-  public void testForwardsRescindedOffers() throws Exception {
-    launcher.cancelOffer(OFFER_ID);
-
-    control.replay();
-
-    launcher.cancelOffer(OFFER_ID);
-  }
-
-  @Test
-  public void testFailedStatusUpdate() throws Exception {
-    storageUtil.expectWrite();
-
-    final CountDownLatch latch = new CountDownLatch(1);
-
-    expect(stateManager.changeState(
-        storageUtil.mutableStoreProvider,
-        TASK_ID_A,
-        Optional.<ScheduleStatus>absent(),
-        RUNNING,
-        Optional.of("fake message")))
-        .andAnswer(() -> {
-            latch.countDown();
-            throw new StorageException("Injected error");
-          });
-
-    control.replay();
-
-    TaskStatus status = TaskStatus.newBuilder()
-        .setState(TaskState.TASK_RUNNING)
-        .setTaskId(TaskID.newBuilder().setValue(TASK_ID_A))
-        .setMessage("fake message")
-        .build();
-
-    launcher.statusUpdate(status);
-
-    assertTrue(latch.await(5L, TimeUnit.SECONDS));
-  }
-
-  @Test
-  public void testMemoryLimitTranslation() throws Exception {
-    storageUtil.expectWrite();
-
-    TaskStatus status = TaskStatus.newBuilder()
-        .setState(TaskState.TASK_FAILED)
-        .setTaskId(TaskID.newBuilder().setValue(TASK_ID_A))
-        .setReason(TaskStatus.Reason.REASON_MEMORY_LIMIT)
-        .setMessage("Some Message")
-        .build();
-
-    expect(stateManager.changeState(
-        storageUtil.mutableStoreProvider,
-        TASK_ID_A,
-        Optional.absent(),
-        FAILED,
-        Optional.of(UserTaskLauncher.MEMORY_LIMIT_DISPLAY)))
-        .andReturn(StateChangeResult.SUCCESS);
-
-    final CountDownLatch latch = new CountDownLatch(1);
-
-    driver.acknowledgeStatusUpdate(status);
-    expectLastCall().andAnswer(() -> {
-        latch.countDown();
-        return null;
-      });
-
-    control.replay();
-
-    launcher.statusUpdate(status);
-
-    assertTrue(latch.await(5L, TimeUnit.SECONDS));
-  }
-
-  @Test
-  public void testThreadFailure() throws Exception {
-    // Re-create the objects from @Before, since we need to inject a mock queue.
-    launcher.stopAsync();
-    launcher.awaitTerminated();
-
-    offerManager = createMock(OfferManager.class);
-    stateManager = createMock(StateManager.class);
-    storageUtil = new StorageTestUtil(this);
-    driver = createMock(Driver.class);
-    queue = createMock(BlockingQueue.class);
-
-    launcher = new UserTaskLauncher(
-        storageUtil.storage,
-        offerManager,
-        stateManager,
-        driver,
-        queue,
-        1000,
-        new CachedCounters(stats));
-
-    expect(queue.add(EasyMock.<TaskStatus>anyObject()))
-        .andReturn(true);
-
-    expect(queue.take())
-        .andAnswer(() -> {
-            throw new RuntimeException();
-          });
-
-    final CountDownLatch latch = new CountDownLatch(1);
-
-    driver.abort();
-    expectLastCall().andAnswer(() -> {
-        latch.countDown();
-        return null;
-      });
-
-    control.replay();
-
-    launcher.startAsync();
-
-    TaskStatus status = TaskStatus.newBuilder()
-        .setState(TaskState.TASK_RUNNING)
-        .setTaskId(TaskID.newBuilder().setValue(TASK_ID_A))
-        .setMessage("fake message")
-        .build();
-
-    launcher.statusUpdate(status);
-
-    assertTrue(latch.await(5L, TimeUnit.SECONDS));
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/4b8c34c0/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
index e4e1587..f08c799 100644
--- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
@@ -32,7 +32,8 @@ import org.apache.aurora.gen.Attribute;
 import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.TaskLauncher;
+import org.apache.aurora.scheduler.TaskStatusHandler;
+import org.apache.aurora.scheduler.async.OfferManager;
 import org.apache.aurora.scheduler.base.Conversions;
 import org.apache.aurora.scheduler.base.SchedulerException;
 import org.apache.aurora.scheduler.events.EventSink;
@@ -44,7 +45,6 @@ import org.apache.aurora.scheduler.storage.Storage.StorageException;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
-import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.ExecutorID;
 import org.apache.mesos.Protos.FrameworkID;
 import org.apache.mesos.Protos.MasterInfo;
@@ -65,6 +65,7 @@ import static org.apache.mesos.Protos.Offer;
 import static org.easymock.EasyMock.anyString;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertTrue;
 
 public class MesosSchedulerImplTest extends EasyMockTest {
@@ -140,8 +141,8 @@ public class MesosSchedulerImplTest extends EasyMockTest {
   private Logger log;
   private StorageTestUtil storageUtil;
   private Command shutdownCommand;
-  private TaskLauncher systemLauncher;
-  private TaskLauncher userLauncher;
+  private TaskStatusHandler statusHandler;
+  private OfferManager offerManager;
   private SchedulerDriver driver;
   private EventSink eventSink;
 
@@ -159,14 +160,15 @@ public class MesosSchedulerImplTest extends EasyMockTest {
     shutdownCommand = createMock(Command.class);
     final Lifecycle lifecycle =
         new Lifecycle(shutdownCommand, createMock(UncaughtExceptionHandler.class));
-    systemLauncher = createMock(TaskLauncher.class);
-    userLauncher = createMock(TaskLauncher.class);
+    statusHandler = createMock(TaskStatusHandler.class);
+    offerManager = createMock(OfferManager.class);
     eventSink = createMock(EventSink.class);
 
     scheduler = new MesosSchedulerImpl(
         storageUtil.storage,
         lifecycle,
-        ImmutableList.of(systemLauncher, userLauncher),
+        statusHandler,
+        offerManager,
         eventSink,
         MoreExecutors.sameThreadExecutor(),
         logger,
@@ -193,48 +195,29 @@ public class MesosSchedulerImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testNoAccepts() {
+  public void testAcceptOffer() {
     new OfferFixture() {
       @Override
       void respondToOffer() {
         expectOfferAttributesSaved(OFFER);
-        expect(systemLauncher.willUse(OFFER)).andReturn(false);
-        expect(userLauncher.willUse(OFFER)).andReturn(false);
+        offerManager.addOffer(OFFER);
       }
     }.run();
   }
 
   @Test
-  public void testOfferFirstAccepts() {
-    new OfferFixture() {
-      @Override
-      void respondToOffer() {
-        expectOfferAttributesSaved(OFFER);
-        expect(systemLauncher.willUse(OFFER)).andReturn(true);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testOfferFirstAcceptsFineLogging() {
-    log.setLevel(Level.FINE);
-    new OfferFixture() {
-      @Override
-      void respondToOffer() {
-        expectOfferAttributesSaved(OFFER);
-        expect(systemLauncher.willUse(OFFER)).andReturn(true);
-      }
-    }.run();
-  }
+  public void testAcceptOfferFineLogging() {
+    Logger mockLogger = createMock(Logger.class);
+    mockLogger.info(anyString());
+    expect(mockLogger.isLoggable(Level.FINE)).andReturn(true);
+    mockLogger.log(eq(Level.FINE), anyString());
+    initializeScheduler(mockLogger);
 
-  @Test
-  public void testOfferSchedulerAccepts() {
     new OfferFixture() {
       @Override
       void respondToOffer() {
         expectOfferAttributesSaved(OFFER);
-        expect(systemLauncher.willUse(OFFER)).andReturn(false);
-        expect(userLauncher.willUse(OFFER)).andReturn(true);
+        offerManager.addOffer(OFFER);
       }
     }.run();
   }
@@ -253,67 +236,24 @@ public class MesosSchedulerImplTest extends EasyMockTest {
         expect(storageUtil.attributeStore.saveHostAttributes(saved)).andReturn(true);
 
         HostOffer offer = new HostOffer(OFFER.getOffer(), draining);
-        expect(systemLauncher.willUse(offer)).andReturn(false);
-        expect(userLauncher.willUse(offer)).andReturn(true);
+        offerManager.addOffer(offer);
       }
     }.run();
   }
 
   @Test
-  public void testStatusUpdateNoAccepts() {
-    new StatusFixture() {
-      @Override
-      void expectations() {
-        eventSink.post(PUBSUB_EVENT);
-        expect(systemLauncher.statusUpdate(status)).andReturn(false);
-        expect(userLauncher.statusUpdate(status)).andReturn(false);
-        expect(driver.acknowledgeStatusUpdate(status)).andReturn(Protos.Status.DRIVER_RUNNING);
-      }
-    }.run();
-  }
-
-  private class FirstLauncherAccepts extends StatusFixture {
-    FirstLauncherAccepts(TaskStatus status) {
-      super(status);
-    }
-
-    @Override
-    void expectations() {
-      eventSink.post(new TaskStatusReceived(
-          status.getState(),
-          Optional.fromNullable(status.getSource()),
-          Optional.fromNullable(status.getReason()),
-          Optional.of(1000000L)
-      ));
-      expect(systemLauncher.statusUpdate(status)).andReturn(true);
-    }
-  }
-
-  @Test
-  public void testStatusUpdateFirstAccepts() {
+  public void testStatusUpdate() {
     // Test multiple variations of fields in TaskStatus to cover all branches.
-    new FirstLauncherAccepts(STATUS).run();
+    new StatusUpdater(STATUS).run();
     control.verify();
     control.reset();
-    new FirstLauncherAccepts(STATUS.toBuilder().clearSource().build()).run();
+    new StatusUpdater(STATUS.toBuilder().clearSource().build()).run();
     control.verify();
     control.reset();
-    new FirstLauncherAccepts(STATUS.toBuilder().clearReason().build()).run();
+    new StatusUpdater(STATUS.toBuilder().clearReason().build()).run();
     control.verify();
     control.reset();
-    new FirstLauncherAccepts(STATUS.toBuilder().clearMessage().build()).run();
-  }
-
-  @Test
-  public void testStatusUpdateSecondAccepts() {
-    new StatusFixture() {
-      @Override
-      void expectations() {
-        eventSink.post(PUBSUB_EVENT);
-        expect(systemLauncher.statusUpdate(status)).andReturn(false);
-        expect(userLauncher.statusUpdate(status)).andReturn(true);
-      }
-    }.run();
+    new StatusUpdater(STATUS.toBuilder().clearMessage().build()).run();
   }
 
   @Test(expected = SchedulerException.class)
@@ -322,8 +262,8 @@ public class MesosSchedulerImplTest extends EasyMockTest {
       @Override
       void expectations() {
         eventSink.post(PUBSUB_EVENT);
-        expect(systemLauncher.statusUpdate(status)).andReturn(false);
-        expect(userLauncher.statusUpdate(status)).andThrow(new StorageException("Injected."));
+        statusHandler.statusUpdate(status);
+        expectLastCall().andThrow(new StorageException("Injected."));
       }
     }.run();
   }
@@ -335,10 +275,8 @@ public class MesosSchedulerImplTest extends EasyMockTest {
       void expectations() {
         expectOfferAttributesSaved(OFFER);
         expectOfferAttributesSaved(OFFER_2);
-        expect(systemLauncher.willUse(OFFER)).andReturn(false);
-        expect(userLauncher.willUse(OFFER)).andReturn(true);
-        expect(systemLauncher.willUse(OFFER_2)).andReturn(false);
-        expect(userLauncher.willUse(OFFER_2)).andReturn(false);
+        offerManager.addOffer(OFFER);
+        offerManager.addOffer(OFFER_2);
       }
 
       @Override
@@ -390,8 +328,7 @@ public class MesosSchedulerImplTest extends EasyMockTest {
 
   @Test
   public void testOfferRescinded() {
-    systemLauncher.cancelOffer(OFFER_ID);
-    userLauncher.cancelOffer(OFFER_ID);
+    offerManager.cancelOffer(OFFER_ID);
 
     control.replay();
 
@@ -425,12 +362,28 @@ public class MesosSchedulerImplTest extends EasyMockTest {
       @Override
       void expectations() {
         eventSink.post(PUBSUB_RECONCILIATION_EVENT);
-        expect(systemLauncher.statusUpdate(status)).andReturn(false);
-        expect(userLauncher.statusUpdate(status)).andReturn(true);
+        statusHandler.statusUpdate(status);
       }
     }.run();
   }
 
+  private class StatusUpdater extends StatusFixture {
+    StatusUpdater(TaskStatus status) {
+      super(status);
+    }
+
+    @Override
+    void expectations() {
+      eventSink.post(new TaskStatusReceived(
+          status.getState(),
+          Optional.fromNullable(status.getSource()),
+          Optional.fromNullable(status.getReason()),
+          Optional.of(1000000L)
+      ));
+      statusHandler.statusUpdate(status);
+    }
+  }
+
   private void expectOfferAttributesSaved(HostOffer offer) {
     expect(storageUtil.attributeStore.getHostAttributes(offer.getOffer().getHostname()))
         .andReturn(Optional.<IHostAttributes>absent());


Mime
View raw message