aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject git commit: Prepare and launch GC executor tasks asynchronously.
Date Sat, 22 Feb 2014 03:24:38 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master f679dfb54 -> f106eab88


Prepare and launch GC executor tasks asynchronously.

Bugs closed: AURORA-214

Reviewed at https://reviews.apache.org/r/18141/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/f106eab8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/f106eab8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/f106eab8

Branch: refs/heads/master
Commit: f106eab88e70009371678015f7388761790e9eec
Parents: f679dfb
Author: Bill Farner <wfarner@apache.org>
Authored: Fri Feb 21 19:21:43 2014 -0800
Committer: Bill Farner <wfarner@apache.org>
Committed: Fri Feb 21 19:21:43 2014 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/MesosSchedulerImpl.java    |  36 +--
 .../aurora/scheduler/SchedulerModule.java       |  17 +-
 .../apache/aurora/scheduler/TaskLauncher.java   |  10 +-
 .../aurora/scheduler/UserTaskLauncher.java      |   5 +-
 .../aurora/scheduler/async/AsyncModule.java     |  24 ++
 .../scheduler/async/GcExecutorLauncher.java     | 269 +++++++++++++++++++
 .../scheduler/periodic/GcExecutorLauncher.java  | 194 -------------
 .../scheduler/MesosSchedulerImplTest.java       |  51 +---
 .../aurora/scheduler/UserTaskLauncherTest.java  |   3 +-
 .../scheduler/async/GcExecutorLauncherTest.java | 199 ++++++++++++++
 .../periodic/GcExecutorLauncherTest.java        | 177 ------------
 11 files changed, 517 insertions(+), 468 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f106eab8/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
index 70ac62e..772511f 100644
--- a/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
@@ -22,9 +22,7 @@ import java.util.logging.Logger;
 
 import javax.inject.Inject;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 import com.twitter.common.application.Lifecycle;
 import com.twitter.common.inject.TimedInterceptor.Timed;
 import com.twitter.common.stats.Stats;
@@ -34,7 +32,6 @@ import org.apache.aurora.codec.ThriftBinaryCodec;
 import org.apache.aurora.gen.comm.SchedulerMessage;
 import org.apache.aurora.scheduler.base.Conversions;
 import org.apache.aurora.scheduler.base.SchedulerException;
-import org.apache.aurora.scheduler.configuration.Resources;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
@@ -48,7 +45,6 @@ import org.apache.mesos.Protos.MasterInfo;
 import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.SlaveID;
-import org.apache.mesos.Protos.TaskInfo;
 import org.apache.mesos.Protos.TaskStatus;
 import org.apache.mesos.Scheduler;
 import org.apache.mesos.SchedulerDriver;
@@ -62,7 +58,6 @@ class MesosSchedulerImpl implements Scheduler {
   private static final Logger LOG = Logger.getLogger(MesosSchedulerImpl.class.getName());
 
   private final AtomicLong resourceOffers = Stats.exportLong("scheduler_resource_offers");
-  private final AtomicLong failedOffers = Stats.exportLong("scheduler_failed_offers");
   private final AtomicLong failedStatusUpdates = Stats.exportLong("scheduler_status_updates");
   private final AtomicLong frameworkDisconnects =
       Stats.exportLong("scheduler_framework_disconnects");
@@ -83,7 +78,11 @@ class MesosSchedulerImpl implements Scheduler {
    *
    * @param schedulerCore Core scheduler.
    * @param lifecycle Application lifecycle manager.
-   * @param taskLaunchers Task launchers.
+   * @param taskLaunchers Task launchers, which will be used in order.  Calls to
+   *                      {@link TaskLauncher#willUse(Offer)} and
+   *                      {@link TaskLauncher#statusUpdate(TaskStatus)} are propagated to provided
+   *                      launchers, ceasing after the first match (based on a return value of
+   *                      {@code true}.
    */
   @Inject
   public MesosSchedulerImpl(
@@ -136,10 +135,6 @@ class MesosSchedulerImpl implements Scheduler {
     frameworkReregisters.incrementAndGet();
   }
 
-  private static boolean fitsInOffer(TaskInfo task, Offer offer) {
-    return Resources.from(offer).greaterThanOrEqual(Resources.from(task.getResourcesList()));
-  }
-
   @Timed("scheduler_resource_offers")
   @Override
   public void resourceOffers(SchedulerDriver driver, final List<Offer> offers) {
@@ -164,26 +159,9 @@ class MesosSchedulerImpl implements Scheduler {
     for (Offer offer : offers) {
       log(Level.FINE, "Received offer: %s", offer);
       resourceOffers.incrementAndGet();
-
-      // Ordering of task launchers is important here, since offers are consumed greedily.
-      // TODO(William Farner): Refactor this area of code now that the primary task launcher
-      // is asynchronous.
       for (TaskLauncher launcher : taskLaunchers) {
-        Optional<TaskInfo> task = Optional.absent();
-        try {
-          task = launcher.createTask(offer);
-        } catch (SchedulerException e) {
-          LOG.log(Level.WARNING, "Failed to schedule offers.", e);
-          failedOffers.incrementAndGet();
-        }
-
-        if (task.isPresent()) {
-          if (fitsInOffer(task.get(), offer)) {
-            driver.launchTasks(offer.getId(), ImmutableList.of(task.get()));
-            break;
-          } else {
-            LOG.warning("Insufficient resources to launch task " + task);
-          }
+        if (launcher.willUse(offer)) {
+          break;
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f106eab8/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
index 92399cc..0092372 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
@@ -21,7 +21,6 @@ import java.util.concurrent.ScheduledExecutorService;
 
 import javax.inject.Singleton;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.AbstractModule;
@@ -36,9 +35,8 @@ import org.apache.aurora.scheduler.Driver.DriverImpl;
 import org.apache.aurora.scheduler.Driver.SettableDriver;
 import org.apache.aurora.scheduler.SchedulerLifecycle.LeadingOptions;
 import org.apache.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl;
+import org.apache.aurora.scheduler.async.GcExecutorLauncher;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
-import org.apache.aurora.scheduler.periodic.GcExecutorLauncher;
-import org.apache.aurora.scheduler.periodic.GcExecutorLauncher.GcExecutorSettings;
 import org.apache.mesos.Scheduler;
 
 /**
@@ -46,14 +44,6 @@ import org.apache.mesos.Scheduler;
  */
 public class SchedulerModule extends AbstractModule {
 
-  @CmdLine(name = "executor_gc_interval",
-      help = "Max interval on which to run the GC executor on a host to clean up dead tasks.")
-  private static final Arg<Amount<Long, Time>> EXECUTOR_GC_INTERVAL =
-      Arg.create(Amount.of(1L, Time.HOURS));
-
-  @CmdLine(name = "gc_executor_path", help = "Path to the gc executor launch script.")
-  private static final Arg<String> GC_EXECUTOR_PATH = Arg.create(null);
-
   @CmdLine(name = "max_registration_delay",
       help = "Max allowable delay to allow the driver to register before aborting")
   private static final Arg<Amount<Long, Time>> MAX_REGISTRATION_DELAY =
@@ -75,11 +65,6 @@ public class SchedulerModule extends AbstractModule {
 
     bind(TaskIdGenerator.class).to(TaskIdGeneratorImpl.class);
 
-    bind(GcExecutorSettings.class).toInstance(new GcExecutorSettings(
-        EXECUTOR_GC_INTERVAL.get(),
-        Optional.fromNullable(GC_EXECUTOR_PATH.get())));
-
-    bind(GcExecutorLauncher.class).in(Singleton.class);
     bind(UserTaskLauncher.class).in(Singleton.class);
 
     install(new PrivateModule() {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f106eab8/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java b/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java
index 96a3ade..5ce5453 100644
--- a/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java
+++ b/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java
@@ -15,11 +15,8 @@
  */
 package org.apache.aurora.scheduler;
 
-import com.google.common.base.Optional;
-
 import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.TaskInfo;
 import org.apache.mesos.Protos.TaskStatus;
 
 /**
@@ -28,16 +25,17 @@ import org.apache.mesos.Protos.TaskStatus;
 public interface TaskLauncher {
 
   /**
-   * Grants a resource offer to the task launcher, which will be passed to any subsequent task
+   * Presents a resource offer to the task launcher, which will be passed to any subsequent task
    * launchers if this one does not accept.
    * <p>
    * A task launcher may choose to retain an offer for later use.  Any retained offers must be
    * cleaned up with {@link #cancelOffer(OfferID)}.
    *
    * @param offer The resource offer.
-   * @return A task, absent if the launcher chooses not to accept the offer.
+   * @return {@code false} if the launcher will not act on the offer, or {@code true} if the
+   *         launcher may accept the offer at some point in the future.
    */
-  Optional<TaskInfo> createTask(Offer offer);
+  boolean willUse(Offer offer);
 
   /**
    * Informs the launcher that a status update has been received for a task.  If the task is not

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f106eab8/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
index 010776e..fd26441 100644
--- a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
+++ b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
@@ -31,7 +31,6 @@ import org.apache.aurora.scheduler.base.SchedulerException;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.TaskInfo;
 import org.apache.mesos.Protos.TaskStatus;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -59,11 +58,11 @@ class UserTaskLauncher implements TaskLauncher {
   }
 
   @Override
-  public Optional<TaskInfo> createTask(Offer offer) {
+  public boolean willUse(Offer offer) {
     checkNotNull(offer);
 
     offerQueue.addOffer(offer);
-    return Optional.absent();
+    return true;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f106eab8/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
index a4a049e..9cc7bfe 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
@@ -17,6 +17,7 @@ package org.apache.aurora.scheduler.async;
 
 import java.lang.annotation.Retention;
 import java.lang.annotation.Target;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.logging.Logger;
@@ -43,6 +44,8 @@ import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.util.Random;
 import com.twitter.common.util.TruncatedBinaryBackoff;
 
+import org.apache.aurora.scheduler.async.GcExecutorLauncher.GcExecutorSettings;
+import org.apache.aurora.scheduler.async.GcExecutorLauncher.RandomGcExecutorSettings;
 import org.apache.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
 import org.apache.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
 import org.apache.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl;
@@ -152,6 +155,14 @@ public class AsyncModule extends AbstractModule {
   @VisibleForTesting
   static final Key<Preemptor> PREEMPTOR_KEY = Key.get(Preemptor.class, PreemptionBinding.class);
 
+  @CmdLine(name = "executor_gc_interval",
+      help = "Max interval on which to run the GC executor on a host to clean up dead tasks.")
+  private static final Arg<Amount<Long, Time>> EXECUTOR_GC_INTERVAL =
+      Arg.create(Amount.of(1L, Time.HOURS));
+
+  @CmdLine(name = "gc_executor_path", help = "Path to the gc executor launch script.")
+  private static final Arg<String> GC_EXECUTOR_PATH = Arg.create(null);
+
   @Override
   protected void configure() {
     // Don't worry about clean shutdown, these can be daemon and cleanup-free.
@@ -252,6 +263,19 @@ public class AsyncModule extends AbstractModule {
       }
     });
     PubsubEventModule.bindSubscriber(binder(), TaskThrottler.class);
+
+    install(new PrivateModule() {
+      @Override
+      protected void configure() {
+        bind(GcExecutorSettings.class).toInstance(new RandomGcExecutorSettings(
+            EXECUTOR_GC_INTERVAL.get(),
+            Optional.fromNullable(GC_EXECUTOR_PATH.get())));
+        bind(Executor.class).toInstance(executor);
+
+        bind(GcExecutorLauncher.class).in(Singleton.class);
+        expose(GcExecutorLauncher.class);
+      }
+    });
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f106eab8/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java b/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
new file mode 100644
index 0000000..d57b0d4
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
@@ -0,0 +1,269 @@
+/**
+ * Copyright 2013 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+import com.google.common.base.Throwables;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Data;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.util.Clock;
+import com.twitter.common.util.Random;
+
+import org.apache.aurora.Protobufs;
+import org.apache.aurora.codec.ThriftBinaryCodec;
+import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
+import org.apache.aurora.gen.comm.AdjustRetainedTasks;
+import org.apache.aurora.scheduler.Driver;
+import org.apache.aurora.scheduler.TaskLauncher;
+import org.apache.aurora.scheduler.base.CommandUtil;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.configuration.Resources;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.mesos.Protos.ExecutorID;
+import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.SlaveID;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.TaskStatus;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A task launcher that periodically initiates garbage collection on a host, re-using a single
+ * garbage collection executor
+ */
+public class GcExecutorLauncher implements TaskLauncher {
+  private static final Logger LOG = Logger.getLogger(GcExecutorLauncher.class.getName());
+
+  private final AtomicLong tasksCreated = Stats.exportLong("scheduler_gc_tasks_created");
+  private final AtomicLong offersConsumed = Stats.exportLong("scheduler_gc_offers_consumed");
+
+  @VisibleForTesting
+  static final Resources TOTAL_GC_EXECUTOR_RESOURCES =
+      new Resources(0.2, Amount.of(128L, Data.MB), Amount.of(16L, Data.MB), 0);
+
+  // An epsilon is used because we are required to supply executor and task resources.
+  @VisibleForTesting
+  static final Resources EPSILON =
+      new Resources(0.01, Amount.of(1L, Data.MB), Amount.of(1L, Data.MB), 0);
+
+  private static final Resources GC_EXECUTOR_TASK_RESOURCES =
+      Resources.subtract(TOTAL_GC_EXECUTOR_RESOURCES, EPSILON);
+
+  @VisibleForTesting
+  static final String SYSTEM_TASK_PREFIX = "system-gc-";
+  private static final String EXECUTOR_NAME = "aurora.gc";
+
+  private final GcExecutorSettings settings;
+  private final Storage storage;
+  private final Clock clock;
+  private final Executor executor;
+  private final Driver driver;
+  private final Supplier<String> uuidGenerator;
+  private final Cache<String, Long> pulses;
+
+  @Inject
+  GcExecutorLauncher(
+      GcExecutorSettings settings,
+      Storage storage,
+      Clock clock,
+      Executor executor,
+      Driver driver) {
+
+    this(
+        settings,
+        storage,
+        clock,
+        executor,
+        driver,
+        new Supplier<String>() {
+          @Override
+          public String get() {
+            return UUID.randomUUID().toString();
+          }
+        });
+  }
+
+  @VisibleForTesting
+  GcExecutorLauncher(
+      GcExecutorLauncher.GcExecutorSettings settings,
+      Storage storage,
+      Clock clock,
+      Executor executor,
+      Driver driver,
+      Supplier<String> uuidGenerator) {
+
+    this.settings = checkNotNull(settings);
+    this.storage = checkNotNull(storage);
+    this.clock = checkNotNull(clock);
+    this.executor = checkNotNull(executor);
+    this.driver = checkNotNull(driver);
+    this.uuidGenerator = checkNotNull(uuidGenerator);
+    this.pulses = CacheBuilder.newBuilder()
+        .expireAfterWrite(settings.getMaxGcInterval(), TimeUnit.MILLISECONDS)
+        .build();
+  }
+
+  @VisibleForTesting
+  TaskInfo makeGcTask(
+      String sourceName,
+      SlaveID slaveId,
+      AdjustRetainedTasks message) {
+
+    ExecutorInfo.Builder executorInfo = ExecutorInfo.newBuilder()
+        .setExecutorId(ExecutorID.newBuilder().setValue(EXECUTOR_NAME))
+        .setName(EXECUTOR_NAME)
+        .setSource(sourceName)
+        .addAllResources(GC_EXECUTOR_TASK_RESOURCES.toResourceList())
+        .setCommand(CommandUtil.create(settings.getGcExecutorPath().get()));
+
+    byte[] data;
+    try {
+      data = ThriftBinaryCodec.encode(message);
+    } catch (CodingException e) {
+      LOG.severe("Failed to encode retained tasks message: " + message);
+      throw Throwables.propagate(e);
+    }
+
+    return TaskInfo.newBuilder().setName("system-gc")
+        .setTaskId(TaskID.newBuilder().setValue(SYSTEM_TASK_PREFIX + uuidGenerator.get()))
+        .setSlaveId(slaveId)
+        .setData(ByteString.copyFrom(data))
+        .setExecutor(executorInfo)
+        .addAllResources(EPSILON.toResourceList())
+        .build();
+  }
+
+  private TaskInfo makeGcTask(String hostName, SlaveID slaveId) {
+    Set<IScheduledTask> tasksOnHost =
+        Storage.Util.weaklyConsistentFetchTasks(storage, Query.slaveScoped(hostName));
+    AdjustRetainedTasks message = new AdjustRetainedTasks()
+        .setRetainedTasks(Maps.transformValues(Tasks.mapById(tasksOnHost), Tasks.GET_STATUS));
+    tasksCreated.incrementAndGet();
+    return makeGcTask(hostName, slaveId, message);
+  }
+
+  private boolean sufficientResources(Offer offer) {
+    boolean sufficient = Resources.from(offer).greaterThanOrEqual(TOTAL_GC_EXECUTOR_RESOURCES);
+    if (!sufficient) {
+      LOG.warning("Offer for host " + offer.getHostname() + " is too small for a GC executor");
+    }
+    return sufficient;
+  }
+
+  @Override
+  public boolean willUse(final Offer offer) {
+    if (!settings.getGcExecutorPath().isPresent()
+        || isAlive(offer.getHostname())
+        || !sufficientResources(offer)) {
+
+      return false;
+    }
+
+    pulses.put(offer.getHostname(), clock.nowMillis() + settings.getDelayMs());
+    executor.execute(new Runnable() {
+      @Override
+      public void run() {
+        driver.launchTask(offer.getId(), makeGcTask(offer.getHostname(), offer.getSlaveId()));
+      }
+    });
+    offersConsumed.incrementAndGet();
+    return true;
+  }
+
+  @Override
+  public boolean statusUpdate(TaskStatus status) {
+    if (status.getTaskId().getValue().startsWith(SYSTEM_TASK_PREFIX)) {
+      LOG.info("Received status update for GC task: " + Protobufs.toString(status));
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public void cancelOffer(OfferID offer) {
+    // No-op.
+  }
+
+  private boolean isAlive(String hostname) {
+    Optional<Long> timestamp = Optional.fromNullable(pulses.getIfPresent(hostname));
+    return timestamp.isPresent() && clock.nowMillis() < timestamp.get();
+  }
+
+  public static class GcExecutorSettings {
+    protected final Amount<Long, Time> gcInterval;
+    private final Optional<String> gcExecutorPath;
+
+    @VisibleForTesting
+    GcExecutorSettings(Amount<Long, Time> gcInterval, Optional<String> gcExecutorPath) {
+      this.gcInterval = checkNotNull(gcInterval);
+      this.gcExecutorPath = checkNotNull(gcExecutorPath);
+    }
+
+    @VisibleForTesting
+    long getMaxGcInterval() {
+      return gcInterval.as(Time.MILLISECONDS);
+    }
+
+    @VisibleForTesting
+    int getDelayMs() {
+      return gcInterval.as(Time.MILLISECONDS).intValue();
+    }
+
+    @VisibleForTesting
+    Optional<String> getGcExecutorPath() {
+      return gcExecutorPath;
+    }
+  }
+
+  /**
+   * Wraps configuration values for the {@code GcExecutorLauncher}.
+   */
+  static class RandomGcExecutorSettings extends GcExecutorSettings {
+    private final Random rand = new Random.SystemRandom(new java.util.Random());
+
+    RandomGcExecutorSettings(Amount<Long, Time> gcInterval, Optional<String> gcExecutorPath) {
+      super(gcInterval, gcExecutorPath);
+    }
+
+    @Override
+    int getDelayMs() {
+      return rand.nextInt(gcInterval.as(Time.MILLISECONDS).intValue());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f106eab8/src/main/java/org/apache/aurora/scheduler/periodic/GcExecutorLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/periodic/GcExecutorLauncher.java b/src/main/java/org/apache/aurora/scheduler/periodic/GcExecutorLauncher.java
deleted file mode 100644
index f0d4fbc..0000000
--- a/src/main/java/org/apache/aurora/scheduler/periodic/GcExecutorLauncher.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.periodic;
-
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.Maps;
-import com.google.protobuf.ByteString;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Data;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.util.Clock;
-import com.twitter.common.util.Random;
-
-import org.apache.aurora.Protobufs;
-import org.apache.aurora.codec.ThriftBinaryCodec;
-import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
-import org.apache.aurora.gen.comm.AdjustRetainedTasks;
-import org.apache.aurora.scheduler.TaskLauncher;
-import org.apache.aurora.scheduler.base.CommandUtil;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.configuration.Resources;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.mesos.Protos.ExecutorID;
-import org.apache.mesos.Protos.ExecutorInfo;
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.TaskID;
-import org.apache.mesos.Protos.TaskInfo;
-import org.apache.mesos.Protos.TaskStatus;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * A task launcher that periodically initiates garbage collection on a host, re-using a single
- * garbage collection executor
- */
-public class GcExecutorLauncher implements TaskLauncher {
-  private static final Logger LOG = Logger.getLogger(GcExecutorLauncher.class.getName());
-
-  private final AtomicLong tasksCreated = Stats.exportLong("scheduler_gc_tasks_created");
-
-  @VisibleForTesting
-  static final Resources TOTAL_GC_EXECUTOR_RESOURCES =
-      new Resources(0.2, Amount.of(128L, Data.MB), Amount.of(16L, Data.MB), 0);
-
-  // An epsilon is used because we are required to supply executor and task resources.
-  @VisibleForTesting
-  static final Resources EPSILON =
-      new Resources(0.01, Amount.of(1L, Data.MB), Amount.of(1L, Data.MB), 0);
-
-  private static final Resources GC_EXECUTOR_RESOURCES =
-      Resources.subtract(TOTAL_GC_EXECUTOR_RESOURCES, EPSILON);
-
-  private static final String SYSTEM_TASK_PREFIX = "system-gc-";
-  private static final String EXECUTOR_NAME = "aurora.gc";
-
-  private final GcExecutorSettings settings;
-  private final Storage storage;
-  private final Clock clock;
-  private final Cache<String, Long> pulses;
-
-  @Inject
-  GcExecutorLauncher(
-      GcExecutorSettings settings,
-      Storage storage,
-      Clock clock) {
-
-    this.settings = checkNotNull(settings);
-    this.storage = checkNotNull(storage);
-    this.clock = checkNotNull(clock);
-
-    this.pulses = CacheBuilder.newBuilder()
-        .expireAfterWrite(settings.getMaxGcInterval(), TimeUnit.MILLISECONDS)
-        .build();
-  }
-
-  @Override
-  public Optional<TaskInfo> createTask(Offer offer) {
-    if (!settings.getGcExecutorPath().isPresent()
-        || !Resources.from(offer).greaterThanOrEqual(TOTAL_GC_EXECUTOR_RESOURCES)
-        || isAlive(offer.getHostname())) {
-      return Optional.absent();
-    }
-
-    Set<IScheduledTask> tasksOnHost =
-        Storage.Util.weaklyConsistentFetchTasks(storage, Query.slaveScoped(offer.getHostname()));
-    AdjustRetainedTasks message = new AdjustRetainedTasks()
-        .setRetainedTasks(Maps.transformValues(Tasks.mapById(tasksOnHost), Tasks.GET_STATUS));
-    byte[] data;
-    try {
-      data = ThriftBinaryCodec.encode(message);
-    } catch (CodingException e) {
-      LOG.severe("Failed to encode retained tasks message: " + message);
-      return Optional.absent();
-    }
-
-    tasksCreated.incrementAndGet();
-    pulses.put(offer.getHostname(), clock.nowMillis() + settings.getDelayMs());
-
-    ExecutorInfo.Builder executor = ExecutorInfo.newBuilder()
-        .setExecutorId(ExecutorID.newBuilder().setValue(EXECUTOR_NAME))
-        .setName(EXECUTOR_NAME)
-        .setSource(offer.getHostname())
-        .addAllResources(GC_EXECUTOR_RESOURCES.toResourceList())
-        .setCommand(CommandUtil.create(settings.getGcExecutorPath().get()));
-
-    return Optional.of(TaskInfo.newBuilder().setName("system-gc")
-        .setTaskId(TaskID.newBuilder().setValue(SYSTEM_TASK_PREFIX + UUID.randomUUID().toString()))
-        .setSlaveId(offer.getSlaveId())
-        .setData(ByteString.copyFrom(data))
-        .setExecutor(executor)
-        .addAllResources(EPSILON.toResourceList())
-        .build());
-  }
-
-  @Override
-  public boolean statusUpdate(TaskStatus status) {
-    if (status.getTaskId().getValue().startsWith(SYSTEM_TASK_PREFIX)) {
-      LOG.info("Received status update for GC task: " + Protobufs.toString(status));
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  @Override
-  public void cancelOffer(OfferID offer) {
-    // No-op.
-  }
-
-  private boolean isAlive(String hostname) {
-    Optional<Long> timestamp = Optional.fromNullable(pulses.getIfPresent(hostname));
-    return timestamp.isPresent() && clock.nowMillis() < timestamp.get();
-  }
-
-  /**
-   * Wraps configuration values for the {@code GcExecutorLauncher}.
-   */
-  public static class GcExecutorSettings {
-    private final Amount<Long, Time> gcInterval;
-    private final Optional<String> gcExecutorPath;
-    private final Random rand = new Random.SystemRandom(new java.util.Random());
-
-    public GcExecutorSettings(
-        Amount<Long, Time> gcInterval,
-        Optional<String> gcExecutorPath) {
-
-      this.gcInterval = checkNotNull(gcInterval);
-      this.gcExecutorPath = checkNotNull(gcExecutorPath);
-    }
-
-    @VisibleForTesting
-    long getMaxGcInterval() {
-      return gcInterval.as(Time.MILLISECONDS);
-    }
-
-    @VisibleForTesting
-    int getDelayMs() {
-      return rand.nextInt(gcInterval.as(Time.MILLISECONDS).intValue());
-    }
-
-    @VisibleForTesting
-    Optional<String> getGcExecutorPath() {
-      return gcExecutorPath;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f106eab8/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java
index 92c77d5..7a91ab8 100644
--- a/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java
@@ -20,7 +20,6 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.testing.TearDown;
 import com.google.inject.AbstractModule;
@@ -33,7 +32,6 @@ import com.twitter.common.testing.easymock.EasyMockTest;
 
 import org.apache.aurora.scheduler.base.Conversions;
 import org.apache.aurora.scheduler.base.SchedulerException;
-import org.apache.aurora.scheduler.configuration.Resources;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
@@ -83,23 +81,9 @@ public class MesosSchedulerImplTest extends EasyMockTest {
       .setId(OFFER_ID_2)
       .build();
 
-  private static final TaskID TASK_ID = TaskID.newBuilder().setValue("task-id").build();
-  private static final TaskInfo TASK = TaskInfo.newBuilder()
-      .setName("task-name")
-      .setSlaveId(SLAVE_ID)
-      .setTaskId(TASK_ID)
-      .build();
-
-  private static final TaskInfo BIGGER_TASK = TaskInfo.newBuilder()
-      .setName("task-name")
-      .setSlaveId(SLAVE_ID)
-      .addResources(Resources.makeMesosResource(Resources.CPUS, 5))
-      .setTaskId(TaskID.newBuilder().setValue("task-id"))
-      .build();
-
   private static final TaskStatus STATUS = TaskStatus.newBuilder()
       .setState(TaskState.TASK_RUNNING)
-      .setTaskId(TASK_ID)
+      .setTaskId(TaskID.newBuilder().setValue("task-id").build())
       .build();
 
   private StorageTestUtil storageUtil;
@@ -158,8 +142,8 @@ public class MesosSchedulerImplTest extends EasyMockTest {
       @Override
       void respondToOffer() throws Exception {
         expectOfferAttributesSaved(OFFER);
-        expect(systemLauncher.createTask(OFFER)).andReturn(Optional.<TaskInfo>absent());
-        expect(userLauncher.createTask(OFFER)).andReturn(Optional.<TaskInfo>absent());
+        expect(systemLauncher.willUse(OFFER)).andReturn(false);
+        expect(userLauncher.willUse(OFFER)).andReturn(false);
       }
     }.run();
   }
@@ -170,8 +154,7 @@ public class MesosSchedulerImplTest extends EasyMockTest {
       @Override
       void respondToOffer() throws Exception {
         expectOfferAttributesSaved(OFFER);
-        expect(systemLauncher.createTask(OFFER)).andReturn(Optional.of(TASK));
-        expectLaunch(TASK);
+        expect(systemLauncher.willUse(OFFER)).andReturn(true);
       }
     }.run();
   }
@@ -182,21 +165,8 @@ public class MesosSchedulerImplTest extends EasyMockTest {
       @Override
       void respondToOffer() throws Exception {
         expectOfferAttributesSaved(OFFER);
-        expect(systemLauncher.createTask(OFFER)).andReturn(Optional.<TaskInfo>absent());
-        expect(userLauncher.createTask(OFFER)).andReturn(Optional.of(TASK));
-        expectLaunch(TASK);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testAcceptedExceedsOffer() throws Exception {
-    new OfferFixture() {
-      @Override
-      void respondToOffer() throws Exception {
-        expectOfferAttributesSaved(OFFER);
-        expect(systemLauncher.createTask(OFFER)).andReturn(Optional.of(BIGGER_TASK));
-        expect(userLauncher.createTask(OFFER)).andReturn(Optional.<TaskInfo>absent());
+        expect(systemLauncher.willUse(OFFER)).andReturn(false);
+        expect(userLauncher.willUse(OFFER)).andReturn(true);
       }
     }.run();
   }
@@ -251,11 +221,10 @@ public class MesosSchedulerImplTest extends EasyMockTest {
       void expectations() throws Exception {
         expectOfferAttributesSaved(OFFER);
         expectOfferAttributesSaved(OFFER_2);
-        expect(systemLauncher.createTask(OFFER)).andReturn(Optional.<TaskInfo>absent());
-        expect(userLauncher.createTask(OFFER)).andReturn(Optional.of(TASK));
-        expectLaunch(TASK);
-        expect(systemLauncher.createTask(OFFER_2)).andReturn(Optional.<TaskInfo>absent());
-        expect(userLauncher.createTask(OFFER_2)).andReturn(Optional.<TaskInfo>absent());
+        expect(systemLauncher.willUse(OFFER)).andReturn(false);
+        expect(userLauncher.willUse(OFFER)).andReturn(true);
+        expect(systemLauncher.willUse(OFFER_2)).andReturn(false);
+        expect(userLauncher.willUse(OFFER_2)).andReturn(false);
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f106eab8/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
index ecf4f90..44cb20e 100644
--- a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
@@ -50,7 +50,6 @@ import static org.apache.aurora.gen.ScheduleStatus.FAILED;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.scheduler.configuration.ConfigurationManager.HOST_CONSTRAINT;
 import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class UserTaskLauncherTest extends EasyMockTest {
@@ -83,7 +82,7 @@ public class UserTaskLauncherTest extends EasyMockTest {
 
     control.replay();
 
-    assertFalse(launcher.createTask(OFFER).isPresent());
+    assertTrue(launcher.willUse(OFFER));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f106eab8/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java b/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
new file mode 100644
index 0000000..016bed9
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
@@ -0,0 +1,199 @@
+/**
+ * Copyright 2013 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.ExecutorConfig;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.comm.AdjustRetainedTasks;
+import org.apache.aurora.scheduler.Driver;
+import org.apache.aurora.scheduler.async.GcExecutorLauncher.GcExecutorSettings;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.configuration.Resources;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.mesos.Protos.FrameworkID;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.Resource;
+import org.apache.mesos.Protos.SlaveID;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.TaskState;
+import org.apache.mesos.Protos.TaskStatus;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.FAILED;
+import static org.apache.aurora.scheduler.async.GcExecutorLauncher.SYSTEM_TASK_PREFIX;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class GcExecutorLauncherTest extends EasyMockTest {
+
+  private static final String HOST = "slave-host";
+
+  private static final Offer OFFER = Offer.newBuilder()
+      .setSlaveId(SlaveID.newBuilder().setValue("slave-id"))
+      .setHostname(HOST)
+      .setFrameworkId(FrameworkID.newBuilder().setValue("framework-id").build())
+      .setId(OfferID.newBuilder().setValue("offer-id"))
+      .addAllResources(GcExecutorLauncher.TOTAL_GC_EXECUTOR_RESOURCES.toResourceList())
+      .build();
+
+  private static final String JOB_A = "jobA";
+
+  private static final GcExecutorSettings SETTINGS =
+      new GcExecutorSettings(Amount.of(1L, Time.HOURS), Optional.of("nonempty"));
+
+  private final AtomicInteger taskIdCounter = new AtomicInteger();
+
+  private FakeClock clock;
+  private StorageTestUtil storageUtil;
+  private Driver driver;
+  private GcExecutorLauncher gcExecutorLauncher;
+
+  @Before
+  public void setUp() {
+    storageUtil = new StorageTestUtil(this);
+    clock = new FakeClock();
+    storageUtil.expectOperations();
+    driver = createMock(Driver.class);
+    gcExecutorLauncher = new GcExecutorLauncher(
+        SETTINGS,
+        storageUtil.storage,
+        clock,
+        MoreExecutors.sameThreadExecutor(),
+        driver,
+        Suppliers.ofInstance("gc"));
+  }
+
+  @Test
+  public void testPruning() throws Exception {
+    IScheduledTask a = makeTask(JOB_A, FAILED);
+    IScheduledTask b = makeTask(JOB_A, FAILED);
+    IScheduledTask c = makeTask(JOB_A, FAILED);
+
+    // First call - no tasks to be collected.
+    expectGetTasksByHost(HOST, a, b, c);
+    expectAdjustRetainedTasks(a, b, c);
+
+    // Third call - two tasks collected.
+    expectGetTasksByHost(HOST, a);
+    expectAdjustRetainedTasks(a);
+
+    control.replay();
+
+    // First call - no items in the cache, no tasks collected.
+    assertTrue(gcExecutorLauncher.willUse(OFFER));
+
+    // Second call - host item alive, no tasks collected.
+    clock.advance(Amount.of((long) SETTINGS.getDelayMs() - 1, Time.MILLISECONDS));
+    assertFalse(gcExecutorLauncher.willUse(OFFER));
+
+    // Third call - two tasks collected.
+    clock.advance(Amount.of(15L, Time.MINUTES));
+    assertTrue(gcExecutorLauncher.willUse(OFFER));
+  }
+
+  @Test
+  public void testNoAcceptingSmallOffers() {
+    control.replay();
+
+    Iterable<Resource> resources =
+        Resources.subtract(
+            GcExecutorLauncher.TOTAL_GC_EXECUTOR_RESOURCES,
+            GcExecutorLauncher.EPSILON).toResourceList();
+    Offer smallOffer = OFFER.toBuilder()
+        .clearResources()
+        .addAllResources(resources)
+        .build();
+    assertFalse(gcExecutorLauncher.willUse(smallOffer));
+  }
+
+  private static TaskStatus makeStatus(String taskId) {
+    return TaskStatus.newBuilder()
+        .setSlaveId(OFFER.getSlaveId())
+        .setState(TaskState.TASK_RUNNING)
+        .setTaskId(TaskID.newBuilder().setValue(taskId))
+        .build();
+  }
+
+  @Test
+  public void testStatusUpdate() {
+    control.replay();
+
+    assertTrue(gcExecutorLauncher.statusUpdate(makeStatus(SYSTEM_TASK_PREFIX)));
+    assertTrue(gcExecutorLauncher.statusUpdate(makeStatus(SYSTEM_TASK_PREFIX + "1")));
+    assertFalse(gcExecutorLauncher.statusUpdate(makeStatus("1" + SYSTEM_TASK_PREFIX)));
+    assertFalse(gcExecutorLauncher.statusUpdate(makeStatus("asdf")));
+  }
+
+  @Test
+  public void testGcExecutorDisabled() {
+    control.replay();
+
+    gcExecutorLauncher = new GcExecutorLauncher(
+        new GcExecutorSettings(Amount.of(1L, Time.HOURS), Optional.<String>absent()),
+        storageUtil.storage,
+        clock,
+        MoreExecutors.sameThreadExecutor(),
+        driver,
+        Suppliers.ofInstance("gc"));
+    assertFalse(gcExecutorLauncher.willUse(OFFER));
+  }
+
+  private void expectAdjustRetainedTasks(IScheduledTask... tasks) {
+    Map<String, ScheduleStatus> statuses =
+        Maps.transformValues(Tasks.mapById(ImmutableSet.copyOf(tasks)), Tasks.GET_STATUS);
+    AdjustRetainedTasks message = new AdjustRetainedTasks().setRetainedTasks(statuses);
+    TaskInfo task = gcExecutorLauncher.makeGcTask(HOST, OFFER.getSlaveId(), message);
+    driver.launchTask(OFFER.getId(), task);
+  }
+
+  private IScheduledTask makeTask(String jobName, ScheduleStatus status) {
+    return IScheduledTask.build(new ScheduledTask()
+        .setStatus(status)
+        .setAssignedTask(new AssignedTask()
+            .setTaskId("task-" + taskIdCounter.incrementAndGet())
+            .setSlaveHost(HOST)
+            .setTask(new TaskConfig()
+                .setJobName(jobName)
+                .setOwner(new Identity().setRole("role").setUser("user"))
+                .setExecutorConfig(new ExecutorConfig("aurora", "config")))));
+  }
+
+  private void expectGetTasksByHost(String host, IScheduledTask... tasks) {
+    storageUtil.expectTaskFetch(Query.slaveScoped(host), tasks);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f106eab8/src/test/java/org/apache/aurora/scheduler/periodic/GcExecutorLauncherTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/periodic/GcExecutorLauncherTest.java b/src/test/java/org/apache/aurora/scheduler/periodic/GcExecutorLauncherTest.java
deleted file mode 100644
index 98f5aa1..0000000
--- a/src/test/java/org/apache/aurora/scheduler/periodic/GcExecutorLauncherTest.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.periodic;
-
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.testing.FakeClock;
-
-import org.apache.aurora.codec.ThriftBinaryCodec;
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.ExecutorConfig;
-import org.apache.aurora.gen.Identity;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.gen.comm.AdjustRetainedTasks;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.configuration.Resources;
-import org.apache.aurora.scheduler.periodic.GcExecutorLauncher.GcExecutorSettings;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.apache.mesos.Protos.ExecutorInfo;
-import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.Resource;
-import org.apache.mesos.Protos.SlaveID;
-import org.apache.mesos.Protos.TaskInfo;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.ScheduleStatus.FAILED;
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-public class GcExecutorLauncherTest extends EasyMockTest {
-
-  private static final String HOST = "slave-host";
-
-  private static final Offer OFFER = Offer.newBuilder()
-      .setSlaveId(SlaveID.newBuilder().setValue("slave-id"))
-      .setHostname(HOST)
-      .setFrameworkId(FrameworkID.newBuilder().setValue("framework-id").build())
-      .setId(OfferID.newBuilder().setValue("offer-id"))
-      .addAllResources(GcExecutorLauncher.TOTAL_GC_EXECUTOR_RESOURCES.toResourceList())
-      .build();
-
-  private static final String JOB_A = "jobA";
-
-  private static final Amount<Long, Time> MAX_GC_INTERVAL = Amount.of(1L, Time.HOURS);
-  private static final Optional<String> GC_EXCECUTOR_PATH = Optional.of("nonempty");
-
-  private final AtomicInteger taskIdCounter = new AtomicInteger();
-
-  private FakeClock clock;
-  private StorageTestUtil storageUtil;
-  private GcExecutorLauncher gcExecutorLauncher;
-  private GcExecutorSettings settings;
-
-  @Before
-  public void setUp() {
-    storageUtil = new StorageTestUtil(this);
-    clock = new FakeClock();
-    storageUtil.expectOperations();
-    settings = createMock(GcExecutorSettings.class);
-    expect(settings.getMaxGcInterval()).andReturn(MAX_GC_INTERVAL.as(Time.MILLISECONDS)).anyTimes();
-  }
-
-  private void replayAndCreate() {
-    control.replay();
-    gcExecutorLauncher = new GcExecutorLauncher(settings, storageUtil.storage, clock);
-  }
-
-  @Test
-  public void testPruning() throws Exception {
-    IScheduledTask thermosPrunedTask = makeTask(JOB_A, true, FAILED);
-    IScheduledTask thermosTask = makeTask(JOB_A, true, FAILED);
-    IScheduledTask nonThermosTask = makeTask(JOB_A, false, FAILED);
-
-    // First call - no tasks to be collected.
-    expectGetTasksByHost(HOST, thermosPrunedTask, thermosTask, nonThermosTask);
-    expect(settings.getDelayMs()).andReturn(Amount.of(30, Time.MINUTES).as(Time.MILLISECONDS));
-
-    // Third call - two tasks collected.
-    expectGetTasksByHost(HOST, thermosPrunedTask);
-    expect(settings.getDelayMs()).andReturn(Amount.of(30, Time.MINUTES).as(Time.MILLISECONDS));
-
-    expect(settings.getGcExecutorPath()).andReturn(GC_EXCECUTOR_PATH).times(5);
-
-    replayAndCreate();
-
-    // First call - no items in the cache, no tasks collected.
-    Optional<TaskInfo> taskInfo = gcExecutorLauncher.createTask(OFFER);
-    assertTrue(taskInfo.isPresent());
-    assertRetainedTasks(taskInfo.get(), thermosPrunedTask, thermosTask, nonThermosTask);
-    ExecutorInfo executor1 = taskInfo.get().getExecutor();
-
-    // Second call - host item alive, no tasks collected.
-    clock.advance(Amount.of(15L, Time.MINUTES));
-    taskInfo = gcExecutorLauncher.createTask(OFFER);
-    assertFalse(taskInfo.isPresent());
-
-    // Third call - two tasks collected.
-    clock.advance(Amount.of(15L, Time.MINUTES));
-    taskInfo = gcExecutorLauncher.createTask(OFFER);
-    assertTrue(taskInfo.isPresent());
-    assertRetainedTasks(taskInfo.get(), thermosPrunedTask);
-
-    // Same executor should be re-used for both tasks
-    assertEquals(executor1, taskInfo.get().getExecutor());
-  }
-
-  @Test
-  public void testNoAcceptingSmallOffers() {
-    expect(settings.getGcExecutorPath()).andReturn(GC_EXCECUTOR_PATH);
-    replayAndCreate();
-
-    Iterable<Resource> resources =
-        Resources.subtract(
-            GcExecutorLauncher.TOTAL_GC_EXECUTOR_RESOURCES,
-            GcExecutorLauncher.EPSILON).toResourceList();
-    Offer smallOffer = OFFER.toBuilder()
-        .clearResources()
-        .addAllResources(resources)
-        .build();
-    assertFalse(gcExecutorLauncher.createTask(smallOffer).isPresent());
-  }
-
-  private static void assertRetainedTasks(TaskInfo taskInfo, IScheduledTask... tasks)
-      throws ThriftBinaryCodec.CodingException {
-    AdjustRetainedTasks message = ThriftBinaryCodec.decode(
-        AdjustRetainedTasks.class, taskInfo.getData().toByteArray());
-    Map<String, IScheduledTask> byId = Tasks.mapById(ImmutableSet.copyOf(tasks));
-    assertNotNull(message);
-    assertEquals(Maps.transformValues(byId, Tasks.GET_STATUS), message.getRetainedTasks());
-  }
-
-  private IScheduledTask makeTask(String jobName, boolean isThermos, ScheduleStatus status) {
-    return IScheduledTask.build(new ScheduledTask()
-        .setStatus(status)
-        .setAssignedTask(new AssignedTask()
-            .setTaskId("task-" + taskIdCounter.incrementAndGet())
-            .setSlaveHost(HOST)
-            .setTask(new TaskConfig()
-                .setJobName(jobName)
-                .setOwner(new Identity().setRole("role").setUser("user"))
-                .setExecutorConfig(isThermos ? new ExecutorConfig("aurora", "config") : null))));
-  }
-
-  private void expectGetTasksByHost(String host, IScheduledTask... tasks) {
-    storageUtil.expectTaskFetch(Query.slaveScoped(host), tasks);
-  }
-}


Mime
View raw message