aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject aurora git commit: Updated scheduler to process status updates asynchronously in batches.
Date Thu, 14 May 2015 01:11:59 GMT
Repository: aurora
Updated Branches:
  refs/heads/master 2dc1d59f1 -> b3f8da3ed


Updated scheduler to process status updates asynchronously in batches.

Bugs closed: AURORA-1228

Reviewed at https://reviews.apache.org/r/33689/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/b3f8da3e
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/b3f8da3e
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/b3f8da3e

Branch: refs/heads/master
Commit: b3f8da3edaf8eb822afe0ff7d5d6a129959e5069
Parents: 2dc1d59
Author: Ben Mahler <benjamin.mahler@gmail.com>
Authored: Wed May 13 17:57:18 2015 -0700
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Wed May 13 17:57:18 2015 -0700

----------------------------------------------------------------------
 .../aurora/benchmark/StatusUpdateBenchmark.java |  42 ++++-
 .../aurora/benchmark/fakes/FakeDriver.java      |  10 ++
 .../aurora/scheduler/SchedulerModule.java       |  22 ++-
 .../aurora/scheduler/UserTaskLauncher.java      | 165 +++++++++++++++----
 .../scheduler/async/GcExecutorLauncher.java     |   1 +
 .../apache/aurora/scheduler/mesos/Driver.java   |  13 ++
 .../scheduler/mesos/MesosSchedulerImpl.java     |   9 +-
 .../scheduler/mesos/SchedulerDriverService.java |  11 ++
 .../aurora/scheduler/UserTaskLauncherTest.java  | 151 ++++++++++++++---
 .../scheduler/async/GcExecutorLauncherTest.java |  18 +-
 .../scheduler/mesos/MesosSchedulerImplTest.java |   2 -
 11 files changed, 373 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/b3f8da3e/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
index 7bb64dd..4c63cc7 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
@@ -15,8 +15,12 @@ package org.apache.aurora.benchmark;
 
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
 
 import javax.inject.Singleton;
 
@@ -29,6 +33,7 @@ import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Provides;
+import com.google.inject.TypeLiteral;
 
 import com.twitter.common.application.ShutdownStage;
 import com.twitter.common.base.Command;
@@ -38,6 +43,7 @@ import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.util.Clock;
 import com.twitter.common.util.testing.FakeClock;
 
+import org.apache.aurora.benchmark.fakes.FakeDriver;
 import org.apache.aurora.benchmark.fakes.FakeOfferManager;
 import org.apache.aurora.benchmark.fakes.FakeRescheduleCalculator;
 import org.apache.aurora.benchmark.fakes.FakeSchedulerDriver;
@@ -49,14 +55,16 @@ import org.apache.aurora.scheduler.UserTaskLauncher;
 import org.apache.aurora.scheduler.async.OfferManager;
 import org.apache.aurora.scheduler.async.RescheduleCalculator;
 import org.apache.aurora.scheduler.async.preemptor.ClusterStateImpl;
+import org.apache.aurora.scheduler.base.AsyncUtil;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
+import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.mesos.DriverFactory;
 import org.apache.aurora.scheduler.mesos.DriverSettings;
 import org.apache.aurora.scheduler.mesos.ExecutorSettings;
-import org.apache.aurora.scheduler.mesos.SchedulerDriverModule;
+import org.apache.aurora.scheduler.mesos.MesosSchedulerImpl;
 import org.apache.aurora.scheduler.state.StateModule;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.db.DbUtil;
@@ -75,6 +83,7 @@ import org.openjdk.jmh.annotations.Param;
 import org.openjdk.jmh.annotations.Scope;
 import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
 import org.openjdk.jmh.annotations.Threads;
 import org.openjdk.jmh.annotations.Warmup;
 
@@ -154,8 +163,8 @@ public class StatusUpdateBenchmark {
   @Param({"5", "25", "100"})
   private long latencyMilliseconds;
 
-  private SchedulerDriver driver;
   private Scheduler scheduler;
+  private UserTaskLauncher userTaskLauncher;
   private SlowStorageWrapper storage;
   private EventBus eventBus;
   private Set<IScheduledTask> tasks;
@@ -171,10 +180,17 @@ public class StatusUpdateBenchmark {
 
     Injector injector = Guice.createInjector(
         new StateModule(),
-        new SchedulerDriverModule(),
         new AbstractModule() {
           @Override
           protected void configure() {
+            bind(Driver.class).toInstance(new FakeDriver());
+            bind(Scheduler.class).to(MesosSchedulerImpl.class);
+            bind(MesosSchedulerImpl.class).in(Singleton.class);
+            bind(Executor.class)
+                .annotatedWith(MesosSchedulerImpl.SchedulerExecutor.class)
+                .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor(
+                    "SchedulerImpl-%d",
+                    Logger.getLogger(StatusUpdateBenchmark.class.getName())));
             bind(DriverFactory.class).toInstance(new DriverFactory() {
               @Override
               public SchedulerDriver create(
@@ -226,13 +242,19 @@ public class StatusUpdateBenchmark {
                 eventBus.post(event);
               }
             });
+            bind(new TypeLiteral<BlockingQueue<Protos.TaskStatus>>() { })
+                .annotatedWith(UserTaskLauncher.StatusUpdateQueue.class)
+                .toInstance(new LinkedBlockingQueue<Protos.TaskStatus>());
+            bind(new TypeLiteral<Integer>() { })
+                .annotatedWith(UserTaskLauncher.MaxBatchSize.class)
+                .toInstance(1000);
+            bind(UserTaskLauncher.class).in(Singleton.class);
           }
 
           @Provides
           @Singleton
-          List<TaskLauncher> provideTaskLaunchers(
-              UserTaskLauncher userTaskLauncher) {
-            return ImmutableList.<TaskLauncher>of(userTaskLauncher);
+          List<TaskLauncher> provideTaskLaunchers(UserTaskLauncher launcher) {
+            return ImmutableList.<TaskLauncher>of(launcher);
           }
         }
     );
@@ -240,6 +262,14 @@ public class StatusUpdateBenchmark {
     eventBus.register(injector.getInstance(ClusterStateImpl.class));
     scheduler = injector.getInstance(Scheduler.class);
     eventBus.register(this);
+
+    userTaskLauncher = injector.getInstance(UserTaskLauncher.class);
+    userTaskLauncher.startAsync();
+  }
+
+  @TearDown(Level.Trial)
+  public void tearDown() {
+    userTaskLauncher.stopAsync();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/aurora/blob/b3f8da3e/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java
index 45de15a..316ab1c 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java
@@ -40,6 +40,16 @@ public class FakeDriver extends AbstractIdleService implements Driver {
   }
 
   @Override
+  public void acknowledgeStatusUpdate(Protos.TaskStatus status) {
+    // no-op
+  }
+
+  @Override
+  public void abort() {
+    // no-op
+  }
+
+  @Override
   protected void startUp() throws Exception {
     // no-op
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/b3f8da3e/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
index d3ac176..6edec22 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
@@ -14,6 +14,8 @@
 package org.apache.aurora.scheduler;
 
 import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.logging.Logger;
 
@@ -23,8 +25,11 @@ import com.google.common.collect.ImmutableList;
 import com.google.inject.AbstractModule;
 import com.google.inject.PrivateModule;
 import com.google.inject.Provides;
+import com.google.inject.TypeLiteral;
+
 import com.twitter.common.args.Arg;
 import com.twitter.common.args.CmdLine;
+import com.twitter.common.args.constraints.Positive;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 
@@ -33,6 +38,7 @@ import org.apache.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl;
 import org.apache.aurora.scheduler.async.GcExecutorLauncher;
 import org.apache.aurora.scheduler.base.AsyncUtil;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.apache.mesos.Protos;
 
 /**
  * Binding module for top-level scheduling logic.
@@ -51,10 +57,14 @@ public class SchedulerModule extends AbstractModule {
   private static final Arg<Amount<Long, Time>> MAX_LEADING_DURATION =
       Arg.create(Amount.of(1L, Time.DAYS));
 
+  @Positive
+  @CmdLine(name = "max_status_update_batch_size",
+      help = "The maximum number of status updates that can be processed in a batch.")
+  private static final Arg<Integer> MAX_STATUS_UPDATE_BATCH_SIZE = Arg.create(1000);
+
   @Override
   protected void configure() {
     bind(TaskIdGenerator.class).to(TaskIdGeneratorImpl.class);
-    bind(UserTaskLauncher.class).in(Singleton.class);
 
     install(new PrivateModule() {
       @Override
@@ -75,6 +85,16 @@ public class SchedulerModule extends AbstractModule {
     bind(TaskVars.class).in(Singleton.class);
     PubsubEventModule.bindSubscriber(binder(), TaskVars.class);
     SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(TaskVars.class);
+
+    bind(new TypeLiteral<BlockingQueue<Protos.TaskStatus>>() { })
+        .annotatedWith(UserTaskLauncher.StatusUpdateQueue.class)
+        .toInstance(new LinkedBlockingQueue<Protos.TaskStatus>());
+    bind(new TypeLiteral<Integer>() { })
+        .annotatedWith(UserTaskLauncher.MaxBatchSize.class)
+        .toInstance(MAX_STATUS_UPDATE_BATCH_SIZE.get());
+
+    bind(UserTaskLauncher.class).in(Singleton.class);
+    SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(UserTaskLauncher.class);
   }
 
   @Provides

http://git-wip-us.apache.org/repos/asf/aurora/blob/b3f8da3e/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
index 0ce9c9d..f1e5dd2 100644
--- a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
+++ b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
@@ -13,31 +13,45 @@
  */
 package org.apache.aurora.scheduler;
 
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import javax.annotation.Nullable;
 import javax.inject.Inject;
+import javax.inject.Qualifier;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import com.twitter.common.stats.Stats;
 
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.async.OfferManager;
 import org.apache.aurora.scheduler.base.Conversions;
-import org.apache.aurora.scheduler.base.SchedulerException;
+import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.TaskStatus;
 
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
 import static java.util.Objects.requireNonNull;
 
 /**
  * A task launcher that matches resource offers against user tasks.
  */
 @VisibleForTesting
-public class UserTaskLauncher implements TaskLauncher {
+public class UserTaskLauncher extends AbstractExecutionThreadService implements TaskLauncher
{
 
   private static final Logger LOG = Logger.getLogger(UserTaskLauncher.class.getName());
 
@@ -50,12 +64,55 @@ public class UserTaskLauncher implements TaskLauncher {
   private final Storage storage;
   private final OfferManager offerManager;
   private final StateManager stateManager;
+  private final Driver driver;
+  private final BlockingQueue<TaskStatus> pendingUpdates;
+  private final int maxBatchSize;
+
+  private final AtomicReference<Thread> threadReference = new AtomicReference<>();
+
+  /**
+   * Binding annotation for the status update queue.
+   */
+  @VisibleForTesting
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  public @interface StatusUpdateQueue { }
+
+  /**
+   * Binding annotation maximum size of a status update batch.
+   */
+  @VisibleForTesting
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  public @interface MaxBatchSize { }
 
   @Inject
-  UserTaskLauncher(Storage storage, OfferManager offerManager, StateManager stateManager)
{
+  UserTaskLauncher(
+      Storage storage,
+      OfferManager offerManager,
+      StateManager stateManager,
+      final Driver driver,
+      @StatusUpdateQueue BlockingQueue<TaskStatus> pendingUpdates,
+      @MaxBatchSize Integer maxBatchSize) {
+
     this.storage = requireNonNull(storage);
     this.offerManager = requireNonNull(offerManager);
     this.stateManager = requireNonNull(stateManager);
+    this.driver = requireNonNull(driver);
+    this.pendingUpdates = requireNonNull(pendingUpdates);
+    this.maxBatchSize = requireNonNull(maxBatchSize);
+
+    Stats.exportSize("status_updates_queue_size", this.pendingUpdates);
+
+    addListener(
+        new Listener() {
+          @Override
+          public void failed(State from, Throwable failure) {
+            LOG.log(Level.SEVERE, "UserTaskLauncher failed: ", failure);
+            driver.abort();
+          }
+        },
+        MoreExecutors.sameThreadExecutor());
   }
 
   @Override
@@ -67,38 +124,8 @@ public class UserTaskLauncher implements TaskLauncher {
   }
 
   @Override
-  public synchronized boolean statusUpdate(final TaskStatus status) {
-    @Nullable String message = null;
-    if (status.hasMessage()) {
-      message = status.getMessage();
-    }
-
-    try {
-      final ScheduleStatus translatedState = Conversions.convertProtoState(status.getState());
-      // TODO(William Farner): Remove this hack once Mesos API change is done.
-      //                       Tracked by: https://issues.apache.org/jira/browse/MESOS-343
-      if (translatedState == ScheduleStatus.FAILED
-          && message != null
-          && message.contains(MEMORY_LIMIT_EXCEEDED)) {
-        message = MEMORY_LIMIT_DISPLAY;
-      }
-
-      final String auditMessage = message;
-      storage.write(new Storage.MutateWork.NoResult.Quiet() {
-        @Override
-        protected void execute(Storage.MutableStoreProvider storeProvider) {
-          stateManager.changeState(
-              storeProvider,
-              status.getTaskId().getValue(),
-              Optional.<ScheduleStatus>absent(),
-              translatedState,
-              Optional.fromNullable(auditMessage));
-        }
-      });
-    } catch (SchedulerException e) {
-      LOG.log(Level.WARNING, "Failed to update status for: " + status, e);
-      throw e;
-    }
+  public boolean statusUpdate(TaskStatus status) {
+    pendingUpdates.add(status);
     return true;
   }
 
@@ -106,4 +133,70 @@ public class UserTaskLauncher implements TaskLauncher {
   public void cancelOffer(OfferID offer) {
     offerManager.cancelOffer(offer);
   }
+
+  @Override
+  protected void triggerShutdown() {
+    Thread thread = threadReference.get();
+
+    if (thread != null) {
+      thread.interrupt();
+    }
+  }
+
+  @Override
+  protected void run() {
+    threadReference.set(Thread.currentThread());
+
+    while (isRunning()) {
+      final Queue<TaskStatus> updates = new ArrayDeque<>();
+
+      try {
+        updates.add(pendingUpdates.take());
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        break;
+      }
+
+      // Process all other available updates, up to the limit on batch size.
+      // TODO(bmahler): Expose histogram metrics of the batch sizes.
+      pendingUpdates.drainTo(updates, maxBatchSize - updates.size());
+
+      try {
+        storage.write(new Storage.MutateWork.NoResult.Quiet() {
+          @Override
+          protected void execute(Storage.MutableStoreProvider storeProvider) {
+            for (TaskStatus status : updates) {
+              ScheduleStatus translatedState = Conversions.convertProtoState(status.getState());
+
+              Optional<String> message = Optional.absent();
+              if (status.hasMessage()) {
+                message = Optional.of(status.getMessage());
+              }
+
+              // TODO(William Farner): Remove this hack once Mesos API change is done.
+              //                       Tracked by: https://issues.apache.org/jira/browse/MESOS-343
+              if (translatedState == ScheduleStatus.FAILED
+                  && message.isPresent()
+                  && message.get().contains(MEMORY_LIMIT_EXCEEDED)) {
+                message = Optional.of(MEMORY_LIMIT_DISPLAY);
+              }
+
+              stateManager.changeState(
+                  storeProvider,
+                  status.getTaskId().getValue(),
+                  Optional.<ScheduleStatus>absent(),
+                  translatedState,
+                  message);
+            }
+          }
+        });
+
+        for (TaskStatus status : updates) {
+          driver.acknowledgeStatusUpdate(status);
+        }
+      } catch (RuntimeException e) {
+        LOG.log(Level.SEVERE, "Failed to process status update batch " + updates, e);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/b3f8da3e/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java b/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
index 4d589a3..f2ef70d 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
@@ -230,6 +230,7 @@ public class GcExecutorLauncher implements TaskLauncher {
       if (status.getState() == Protos.TaskState.TASK_LOST) {
         lostTasks.incrementAndGet();
       }
+      driver.acknowledgeStatusUpdate(status);
       return true;
     } else {
       return false;

http://git-wip-us.apache.org/repos/asf/aurora/blob/b3f8da3e/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java b/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java
index c7e45a8..975ea02 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java
@@ -17,6 +17,7 @@ import com.google.common.util.concurrent.Service;
 
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.TaskStatus;
 
 /**
  * Wraps the mesos Scheduler driver to ensure its used in a valid lifecycle; namely:
@@ -51,7 +52,19 @@ public interface Driver extends Service {
   void killTask(String taskId);
 
   /**
+   * Acknowledges the given {@code status} update.
+   *
+   * @param status The status to acknowledge.
+   */
+  void acknowledgeStatusUpdate(TaskStatus status);
+
+  /**
    * Blocks until the driver is no longer active.
    */
   void blockUntilStopped();
+
+  /**
+   * Aborts the driver.
+   */
+  void abort();
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/b3f8da3e/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
index 9b8ab7c..f233d5a 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
@@ -23,6 +23,7 @@ import java.util.logging.Logger;
 import javax.inject.Inject;
 import javax.inject.Qualifier;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -63,7 +64,8 @@ import static org.apache.mesos.Protos.Offer;
 /**
  * Location for communication with mesos.
  */
-class MesosSchedulerImpl implements Scheduler {
+@VisibleForTesting
+public class MesosSchedulerImpl implements Scheduler {
   private final List<TaskLauncher> taskLaunchers;
 
   private final Storage storage;
@@ -77,9 +79,10 @@ class MesosSchedulerImpl implements Scheduler {
   /**
    * Binding annotation for the executor the incoming Mesos message handler uses.
    */
+  @VisibleForTesting
   @Qualifier
   @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-  @interface SchedulerExecutor { }
+  public @interface SchedulerExecutor { }
 
   /**
    * Creates a new scheduler.
@@ -228,7 +231,7 @@ class MesosSchedulerImpl implements Scheduler {
     try {
       for (TaskLauncher launcher : taskLaunchers) {
         if (launcher.statusUpdate(status)) {
-          driver.acknowledgeStatusUpdate(status);
+          // The launcher is responsible for acknowledging the update.
           return;
         }
       }

http://git-wip-us.apache.org/repos/asf/aurora/blob/b3f8da3e/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
index da2d5df..35cada6 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
@@ -114,6 +114,11 @@ class SchedulerDriverService extends AbstractIdleService implements Driver
{
   }
 
   @Override
+  public void abort() {
+    Futures.getUnchecked(driverFuture).abort();
+  }
+
+  @Override
   public void launchTask(Protos.OfferID offerId, Protos.TaskInfo task) {
     checkState(isRunning(), "Driver is not running.");
     Futures.getUnchecked(driverFuture)
@@ -138,4 +143,10 @@ class SchedulerDriverService extends AbstractIdleService implements Driver
{
       killFailures.incrementAndGet();
     }
   }
+
+  @Override
+  public void acknowledgeStatusUpdate(Protos.TaskStatus status) {
+    checkState(isRunning(), "Driver is not running.");
+    Futures.getUnchecked(driverFuture).acknowledgeStatusUpdate(status);
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/b3f8da3e/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
index 8da488d..f4631c1 100644
--- a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
@@ -13,6 +13,11 @@
  */
 package org.apache.aurora.scheduler;
 
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
 import com.google.common.base.Optional;
 import com.twitter.common.collections.Pair;
 import com.twitter.common.testing.easymock.EasyMockTest;
@@ -20,6 +25,7 @@ import com.twitter.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.async.OfferManager;
+import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.mesos.Offers;
 import org.apache.aurora.scheduler.state.StateChangeResult;
 import org.apache.aurora.scheduler.state.StateManager;
@@ -30,12 +36,15 @@ import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.TaskID;
 import org.apache.mesos.Protos.TaskState;
 import org.apache.mesos.Protos.TaskStatus;
+import org.easymock.EasyMock;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.aurora.gen.ScheduleStatus.FAILED;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertTrue;
 
 public class UserTaskLauncherTest extends EasyMockTest {
@@ -50,16 +59,33 @@ public class UserTaskLauncherTest extends EasyMockTest {
   private OfferManager offerManager;
   private StateManager stateManager;
   private StorageTestUtil storageUtil;
+  private Driver driver;
+  private BlockingQueue<TaskStatus> queue;
 
-  private TaskLauncher launcher;
+  private UserTaskLauncher launcher;
 
   @Before
   public void setUp() {
     offerManager = createMock(OfferManager.class);
     stateManager = createMock(StateManager.class);
     storageUtil = new StorageTestUtil(this);
-    storageUtil.expectOperations();
-    launcher = new UserTaskLauncher(storageUtil.storage, offerManager, stateManager);
+    driver = createMock(Driver.class);
+    queue = new LinkedBlockingQueue<>();
+
+    launcher = new UserTaskLauncher(
+        storageUtil.storage,
+        offerManager,
+        stateManager,
+        driver,
+        queue,
+        1000);
+
+    launcher.startAsync();
+  }
+
+  @After
+  public void after() {
+    launcher.stopAsync();
   }
 
   @Test
@@ -73,6 +99,14 @@ public class UserTaskLauncherTest extends EasyMockTest {
 
   @Test
   public void testForwardsStatusUpdates() throws Exception {
+    TaskStatus status = TaskStatus.newBuilder()
+        .setState(TaskState.TASK_RUNNING)
+        .setTaskId(TaskID.newBuilder().setValue(TASK_ID_A))
+        .setMessage("fake message")
+        .build();
+
+    storageUtil.expectWrite();
+
     expect(stateManager.changeState(
         storageUtil.mutableStoreProvider,
         TASK_ID_A,
@@ -81,14 +115,19 @@ public class UserTaskLauncherTest extends EasyMockTest {
         Optional.of("fake message")))
         .andReturn(StateChangeResult.SUCCESS);
 
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    driver.acknowledgeStatusUpdate(status);
+    expectLastCall().andAnswer(() -> {
+        latch.countDown();
+        return null;
+      });
+
     control.replay();
 
-    TaskStatus status = TaskStatus.newBuilder()
-        .setState(TaskState.TASK_RUNNING)
-        .setTaskId(TaskID.newBuilder().setValue(TASK_ID_A))
-        .setMessage("fake message")
-        .build();
     assertTrue(launcher.statusUpdate(status));
+
+    assertTrue(latch.await(5L, TimeUnit.SECONDS));
   }
 
   @Test
@@ -100,15 +139,22 @@ public class UserTaskLauncherTest extends EasyMockTest {
     launcher.cancelOffer(OFFER_ID);
   }
 
-  @Test(expected = StorageException.class)
+  @Test
   public void testFailedStatusUpdate() throws Exception {
+    storageUtil.expectWrite();
+
+    final CountDownLatch latch = new CountDownLatch(1);
+
     expect(stateManager.changeState(
         storageUtil.mutableStoreProvider,
         TASK_ID_A,
         Optional.<ScheduleStatus>absent(),
         RUNNING,
         Optional.of("fake message")))
-        .andThrow(new StorageException("Injected error"));
+        .andAnswer(() -> {
+            latch.countDown();
+            throw new StorageException("Injected error");
+          });
 
     control.replay();
 
@@ -117,20 +163,15 @@ public class UserTaskLauncherTest extends EasyMockTest {
         .setTaskId(TaskID.newBuilder().setValue(TASK_ID_A))
         .setMessage("fake message")
         .build();
+
     launcher.statusUpdate(status);
+
+    assertTrue(latch.await(5L, TimeUnit.SECONDS));
   }
 
   @Test
   public void testMemoryLimitTranslationHack() throws Exception {
-    expect(stateManager.changeState(
-        storageUtil.mutableStoreProvider,
-        TASK_ID_A,
-        Optional.<ScheduleStatus>absent(),
-        FAILED,
-        Optional.of(UserTaskLauncher.MEMORY_LIMIT_DISPLAY)))
-        .andReturn(StateChangeResult.ILLEGAL);
-
-    control.replay();
+    storageUtil.expectWrite();
 
     TaskStatus status = TaskStatus.newBuilder()
         .setState(TaskState.TASK_FAILED)
@@ -159,6 +200,78 @@ public class UserTaskLauncherTest extends EasyMockTest {
             + "total_active_file 700416\n"
             + "total_unevictable 0 ")
         .build();
+
+    expect(stateManager.changeState(
+        storageUtil.mutableStoreProvider,
+        TASK_ID_A,
+        Optional.<ScheduleStatus>absent(),
+        FAILED,
+        Optional.of(UserTaskLauncher.MEMORY_LIMIT_DISPLAY)))
+        .andReturn(StateChangeResult.ILLEGAL);
+
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    driver.acknowledgeStatusUpdate(status);
+    expectLastCall().andAnswer(() -> {
+        latch.countDown();
+        return null;
+      });
+
+    control.replay();
+
+    launcher.statusUpdate(status);
+
+    assertTrue(latch.await(5L, TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void testThreadFailure() throws Exception {
+    // Re-create the objects from @Before, since we need to inject a mock queue.
+    launcher.stopAsync();
+    launcher.awaitTerminated();
+
+    offerManager = createMock(OfferManager.class);
+    stateManager = createMock(StateManager.class);
+    storageUtil = new StorageTestUtil(this);
+    driver = createMock(Driver.class);
+    queue = createMock(BlockingQueue.class);
+
+    launcher = new UserTaskLauncher(
+        storageUtil.storage,
+        offerManager,
+        stateManager,
+        driver,
+        queue,
+        1000);
+
+    expect(queue.add(EasyMock.<TaskStatus>anyObject()))
+        .andReturn(true);
+
+    expect(queue.take())
+        .andAnswer(() -> {
+            throw new RuntimeException();
+          });
+
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    driver.abort();
+    expectLastCall().andAnswer(() -> {
+        latch.countDown();
+        return null;
+      });
+
+    control.replay();
+
+    launcher.startAsync();
+
+    TaskStatus status = TaskStatus.newBuilder()
+        .setState(TaskState.TASK_RUNNING)
+        .setTaskId(TaskID.newBuilder().setValue(TASK_ID_A))
+        .setMessage("fake message")
+        .build();
+
     launcher.statusUpdate(status);
+
+    assertTrue(latch.await(5L, TimeUnit.SECONDS));
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/b3f8da3e/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java b/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
index 422d5a9..d2ec944 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
@@ -191,15 +191,25 @@ public class GcExecutorLauncherTest extends EasyMockTest {
 
   @Test
   public void testStatusUpdate() {
+    TaskStatus gcStatus1 = makeStatus(SYSTEM_TASK_PREFIX);
+    TaskStatus gcStatus2 = makeStatus(SYSTEM_TASK_PREFIX + "1");
+    TaskStatus lost = makeStatus(SYSTEM_TASK_PREFIX).toBuilder()
+        .setState(TaskState.TASK_LOST).build();
+
+    driver.acknowledgeStatusUpdate(gcStatus1);
+    driver.acknowledgeStatusUpdate(gcStatus2);
+    driver.acknowledgeStatusUpdate(lost);
+
     replayAndConstruct();
 
-    assertTrue(gcExecutorLauncher.statusUpdate(makeStatus(SYSTEM_TASK_PREFIX)));
-    assertTrue(gcExecutorLauncher.statusUpdate(makeStatus(SYSTEM_TASK_PREFIX + "1")));
+    assertTrue(gcExecutorLauncher.statusUpdate(gcStatus1));
+    assertTrue(gcExecutorLauncher.statusUpdate(gcStatus2));
+
     assertFalse(gcExecutorLauncher.statusUpdate(makeStatus("1" + SYSTEM_TASK_PREFIX)));
     assertFalse(gcExecutorLauncher.statusUpdate(makeStatus("asdf")));
+
     assertEquals(0, lostTasks.get());
-    assertTrue(gcExecutorLauncher.statusUpdate(
-        makeStatus(SYSTEM_TASK_PREFIX).toBuilder().setState(TaskState.TASK_LOST).build()));
+    assertTrue(gcExecutorLauncher.statusUpdate(lost));
     assertEquals(1, lostTasks.get());
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/b3f8da3e/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
index abdeee4..f0f9ac3 100644
--- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
@@ -268,7 +268,6 @@ public class MesosSchedulerImplTest extends EasyMockTest {
           Optional.of(1000000L)
       ));
       expect(systemLauncher.statusUpdate(status)).andReturn(true);
-      expect(driver.acknowledgeStatusUpdate(status)).andReturn(Protos.Status.DRIVER_RUNNING);
     }
   }
 
@@ -295,7 +294,6 @@ public class MesosSchedulerImplTest extends EasyMockTest {
         eventSink.post(PUBSUB_EVENT);
         expect(systemLauncher.statusUpdate(status)).andReturn(false);
         expect(userLauncher.statusUpdate(status)).andReturn(true);
-        expect(driver.acknowledgeStatusUpdate(status)).andReturn(Protos.Status.DRIVER_RUNNING);
       }
     }.run();
   }


Mime
View raw message