aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject git commit: Add offer reservations to preemption flow
Date Wed, 18 Dec 2013 19:58:30 GMT
Updated Branches:
  refs/heads/master d77b3df6b -> 3f65a46be


Add offer reservations to preemption flow

The reservation associates a slave id with a task id for a fixed duration.
If the task attempts to schedule itself during that time period and an offer
is available from that slave then it will be scheduled.
If another task attempts to schedule itself then it will not use the reserved offer.

Testing Done:
./gradlew clean build

Bugs closed: AURORA-11

Reviewed at https://reviews.apache.org/r/16232/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/3f65a46b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/3f65a46b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/3f65a46b

Branch: refs/heads/master
Commit: 3f65a46beacce23066627c6bebc2b481594b5ce8
Parents: d77b3df
Author: Zameer Manji <zmanji@twopensource.com>
Authored: Wed Dec 18 11:58:08 2013 -0800
Committer: Bill Farner <bill@twitter.com>
Committed: Wed Dec 18 11:58:08 2013 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/async/AsyncModule.java     |  61 +++-
 .../aurora/scheduler/async/Preemptor.java       |   3 -
 .../aurora/scheduler/async/TaskGroups.java      |  63 ++---
 .../aurora/scheduler/async/TaskScheduler.java   | 264 +++++++++++++-----
 .../scheduler/async/TaskSchedulerImplTest.java  | 278 +++++++++++++++++++
 .../scheduler/async/TaskSchedulerTest.java      |  21 +-
 .../aurora/scheduler/state/PubsubTestUtil.java  |   6 +-
 7 files changed, 571 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3f65a46b/src/main/java/com/twitter/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/AsyncModule.java b/src/main/java/com/twitter/aurora/scheduler/async/AsyncModule.java
index db07841..faf3269 100644
--- a/src/main/java/com/twitter/aurora/scheduler/async/AsyncModule.java
+++ b/src/main/java/com/twitter/aurora/scheduler/async/AsyncModule.java
@@ -15,24 +15,30 @@
  */
 package com.twitter.aurora.scheduler.async;
 
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.logging.Logger;
 
 import javax.inject.Singleton;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.BindingAnnotation;
+import com.google.inject.Key;
 import com.google.inject.PrivateModule;
 import com.google.inject.TypeLiteral;
 
 import com.twitter.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
 import com.twitter.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
 import com.twitter.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl;
-import com.twitter.aurora.scheduler.async.TaskGroups.SchedulingAction;
 import com.twitter.aurora.scheduler.async.TaskGroups.TaskGroupsSettings;
+import com.twitter.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
 import com.twitter.aurora.scheduler.events.PubsubEventModule;
 import com.twitter.common.args.Arg;
 import com.twitter.common.args.CmdLine;
@@ -44,9 +50,15 @@ import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.util.Random;
 import com.twitter.common.util.TruncatedBinaryBackoff;
 
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
 import static com.twitter.aurora.scheduler.async.HistoryPruner.PruneThreshold;
 import static com.twitter.aurora.scheduler.async.Preemptor.PreemptorImpl;
 import static com.twitter.aurora.scheduler.async.Preemptor.PreemptorImpl.PreemptionDelay;
+import static com.twitter.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl.ReservationDuration;
 
 /**
  * Binding module for async task management.
@@ -123,6 +135,18 @@ public class AsyncModule extends AbstractModule {
     }
   };
 
+  @CmdLine(name = "offer_reservation_duration", help = "Time to reserve a slave's offers
while "
+      + "trying to satisfy a task preempting another.")
+  private static final Arg<Amount<Long, Time>> RESERVATION_DURATION =
+      Arg.create(Amount.of(3L, Time.MINUTES));
+
+  @BindingAnnotation
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  private @interface PreemptionBinding { }
+
+  @VisibleForTesting
+  static final Key<Preemptor> PREEMPTOR_KEY = Key.get(Preemptor.class, PreemptionBinding.class);
+
   @Override
   protected void configure() {
     // Don't worry about clean shutdown, these can be daemon and cleanup-free.
@@ -164,22 +188,23 @@ public class AsyncModule extends AbstractModule {
                 MAX_RESCHEDULING_DELAY.get()));
 
         bind(RescheduleCalculator.class).to(RescheduleCalculatorImpl.class).in(Singleton.class);
-        bind(SchedulingAction.class).to(TaskScheduler.class);
-        bind(TaskScheduler.class).in(Singleton.class);
         if (ENABLE_PREEMPTOR.get()) {
-          bind(Preemptor.class).to(PreemptorImpl.class);
+          bind(PREEMPTOR_KEY).to(PreemptorImpl.class);
           bind(PreemptorImpl.class).in(Singleton.class);
           LOG.info("Preemptor Enabled.");
         } else {
-          bind(Preemptor.class).toInstance(NULL_PREEMPTOR);
+          bind(PREEMPTOR_KEY).toInstance(NULL_PREEMPTOR);
           LOG.warning("Preemptor Disabled.");
         }
-        bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(PreemptionDelay.class)
+        expose(PREEMPTOR_KEY);
+        bind(new TypeLiteral<Amount<Long, Time>>() {
+        }).annotatedWith(PreemptionDelay.class)
             .toInstance(PREEMPTION_DELAY.get());
         bind(TaskGroups.class).in(Singleton.class);
         expose(TaskGroups.class);
       }
     });
+    bindTaskScheduler(binder(), PREEMPTOR_KEY, RESERVATION_DURATION.get());
     PubsubEventModule.bindSubscriber(binder(), TaskGroups.class);
 
     binder().install(new PrivateModule() {
@@ -210,6 +235,30 @@ public class AsyncModule extends AbstractModule {
   }
 
   /**
+   * This method exists because we want to test the wiring up of TaskSchedulerImpl class
to the
+   * PubSub system in the TaskSchedulerImplTest class. The method has a complex signature
because
+   * the binding of the TaskScheduler and friends occurs in a PrivateModule which does not
interact
+   * well with the MultiBinder that backs the PubSub system.
+   */
+  @VisibleForTesting
+  static void bindTaskScheduler(
+      Binder binder,
+      final Key<Preemptor> preemptorKey,
+      final Amount<Long, Time> reservationDuration) {
+        binder.install(new PrivateModule() {
+          @Override protected void configure() {
+            bind(Preemptor.class).to(preemptorKey);
+            bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(ReservationDuration.class)
+                .toInstance(reservationDuration);
+            bind(TaskScheduler.class).to(TaskSchedulerImpl.class);
+            bind(TaskSchedulerImpl.class).in(Singleton.class);
+            expose(TaskScheduler.class);
+          }
+        });
+        PubsubEventModule.bindSubscriber(binder, TaskScheduler.class);
+  }
+
+  /**
    * Returns offers after a random duration within a fixed window.
    */
   private static class RandomJitterReturnDelay implements OfferReturnDelay {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3f65a46b/src/main/java/com/twitter/aurora/scheduler/async/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/Preemptor.java b/src/main/java/com/twitter/aurora/scheduler/async/Preemptor.java
index e5aeb83..a01790c 100644
--- a/src/main/java/com/twitter/aurora/scheduler/async/Preemptor.java
+++ b/src/main/java/com/twitter/aurora/scheduler/async/Preemptor.java
@@ -303,9 +303,6 @@ public interface Preemptor {
       return Multimaps.index(activeTasks, TASK_TO_SLAVE_ID);
     }
 
-    // TODO(zmanji): Add throttling to prevent how much preemption a single task can cause
over
-    // time.
-    // TODO(zmanji): Get the offer queue to associate a slave with a pending task.
     @Override
     public synchronized Optional<String> findPreemptionSlotFor(String taskId) {
       List<IAssignedTask> pendingTasks =

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3f65a46b/src/main/java/com/twitter/aurora/scheduler/async/TaskGroups.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/TaskGroups.java b/src/main/java/com/twitter/aurora/scheduler/async/TaskGroups.java
index f95f719..a59e5c8 100644
--- a/src/main/java/com/twitter/aurora/scheduler/async/TaskGroups.java
+++ b/src/main/java/com/twitter/aurora/scheduler/async/TaskGroups.java
@@ -74,7 +74,6 @@ public class TaskGroups implements EventSubscriber {
   private final LoadingCache<GroupKey, TaskGroup> groups;
   private final Clock clock;
   private final RescheduleCalculator rescheduleCalculator;
-  private final Preemptor preemptor;
 
   static class TaskGroupsSettings {
     private final BackoffStrategy taskGroupBackoff;
@@ -91,20 +90,18 @@ public class TaskGroups implements EventSubscriber {
       ShutdownRegistry shutdownRegistry,
       Storage storage,
       TaskGroupsSettings settings,
-      SchedulingAction schedulingAction,
+      TaskScheduler taskScheduler,
       Clock clock,
-      RescheduleCalculator rescheduleCalculator,
-      Preemptor preemptor) {
+      RescheduleCalculator rescheduleCalculator) {
 
     this(
         createThreadPool(shutdownRegistry),
         storage,
         settings.taskGroupBackoff,
         settings.rateLimiter,
-        schedulingAction,
+        taskScheduler,
         clock,
-        rescheduleCalculator,
-        preemptor);
+        rescheduleCalculator);
   }
 
   TaskGroups(
@@ -112,24 +109,22 @@ public class TaskGroups implements EventSubscriber {
       final Storage storage,
       final BackoffStrategy taskGroupBackoffStrategy,
       final RateLimiter rateLimiter,
-      final SchedulingAction schedulingAction,
+      final TaskScheduler taskScheduler,
       final Clock clock,
-      final RescheduleCalculator rescheduleCalculator,
-      final Preemptor preemptor) {
+      final RescheduleCalculator rescheduleCalculator) {
 
     this.storage = checkNotNull(storage);
     checkNotNull(executor);
     checkNotNull(taskGroupBackoffStrategy);
     checkNotNull(rateLimiter);
-    checkNotNull(schedulingAction);
+    checkNotNull(taskScheduler);
     this.clock = checkNotNull(clock);
     this.rescheduleCalculator = checkNotNull(rescheduleCalculator);
-    this.preemptor = checkNotNull(preemptor);
 
-    final SchedulingAction rateLimitedAction = new SchedulingAction() {
-      @Override public boolean schedule(String taskId) {
+    final TaskScheduler ratelLimitedScheduler = new TaskScheduler() {
+      @Override public TaskSchedulerResult schedule(String taskId) {
         rateLimiter.acquire();
-        return schedulingAction.schedule(taskId);
+        return taskScheduler.schedule(taskId);
       }
     };
 
@@ -137,7 +132,7 @@ public class TaskGroups implements EventSubscriber {
       @Override public TaskGroup load(GroupKey key) {
         TaskGroup group = new TaskGroup(key, taskGroupBackoffStrategy);
         LOG.info("Evaluating group " + key + " in " + group.getPenaltyMs() + " ms");
-        startGroup(group, executor, rateLimitedAction);
+        startGroup(group, executor, ratelLimitedScheduler);
         return group;
       }
     });
@@ -154,7 +149,7 @@ public class TaskGroups implements EventSubscriber {
   private void startGroup(
       final TaskGroup group,
       final ScheduledExecutorService executor,
-      final SchedulingAction action) {
+      final TaskScheduler taskScheduler) {
 
     Runnable monitor = new Runnable() {
       @Override public void run() {
@@ -167,15 +162,21 @@ public class TaskGroups implements EventSubscriber {
 
           case READY:
             String id = group.pop();
-            if (action.schedule(id)) {
-              if (!maybeInvalidate(group)) {
-                executor.schedule(this, group.resetPenaltyAndGet(), TimeUnit.MILLISECONDS);
-              }
-            } else {
-              group.push(id, clock.nowMillis());
-              executor.schedule(this, group.penalizeAndGet(), TimeUnit.MILLISECONDS);
-              // TODO(zmanji): Use the return value in a slave <-> task matching manner
-              preemptor.findPreemptionSlotFor(id);
+            TaskScheduler.TaskSchedulerResult result = taskScheduler.schedule(id);
+            switch (result) {
+              case SUCCESS:
+                if (!maybeInvalidate(group)) {
+                  executor.schedule(this, group.resetPenaltyAndGet(), TimeUnit.MILLISECONDS);
+                }
+                break;
+
+              case TRY_AGAIN:
+                group.push(id, clock.nowMillis());
+                executor.schedule(this, group.penalizeAndGet(), TimeUnit.MILLISECONDS);
+                break;
+
+              default:
+                throw new IllegalStateException("Unknown TaskSchedulerResult " + result);
             }
             break;
 
@@ -290,14 +291,4 @@ public class TaskGroups implements EventSubscriber {
       return JobKeys.toPath(Tasks.INFO_TO_JOB_KEY.apply(canonicalTask));
     }
   }
-
-  interface SchedulingAction {
-    /**
-     * Attempts to schedule a task, possibly performing irreversible actions.
-     *
-     * @param taskId The task to attempt to schedule.
-     * @return {@code true} if the task was scheduled, {@code false} otherwise.
-     */
-    boolean schedule(String taskId);
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3f65a46b/src/main/java/com/twitter/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/TaskScheduler.java b/src/main/java/com/twitter/aurora/scheduler/async/TaskScheduler.java
index fbd82ff..0ad9e13 100644
--- a/src/main/java/com/twitter/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/com/twitter/aurora/scheduler/async/TaskScheduler.java
@@ -15,6 +15,9 @@
  */
 package com.twitter.aurora.scheduler.async;
 
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -24,13 +27,20 @@ import javax.inject.Inject;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
+import com.google.common.base.Ticker;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Iterables;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.BindingAnnotation;
 
 import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.SlaveID;
 import org.apache.mesos.Protos.TaskInfo;
 
-import com.twitter.aurora.scheduler.async.TaskGroups.SchedulingAction;
 import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import com.twitter.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import com.twitter.aurora.scheduler.state.StateManager;
 import com.twitter.aurora.scheduler.state.TaskAssigner;
 import com.twitter.aurora.scheduler.storage.Storage;
@@ -38,7 +48,16 @@ import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
 import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
 import com.twitter.common.inject.TimedInterceptor.Timed;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatImpl;
 import com.twitter.common.stats.Stats;
+import com.twitter.common.util.Clock;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -46,86 +65,193 @@ import static com.twitter.aurora.gen.ScheduleStatus.LOST;
 import static com.twitter.aurora.gen.ScheduleStatus.PENDING;
 
 /**
- * An asynchronous task scheduler.  Scheduling of tasks is performed on a delay, where each
task
- * backs off after a failed scheduling attempt.
- * <p>
- * Pending tasks are advertised to the scheduler via internal pubsub notifications.
+ * Enables scheduling and preemption of tasks.
  */
-class TaskScheduler implements SchedulingAction {
+interface TaskScheduler extends EventSubscriber {
+
+  /**
+   * Attempts to schedule a task, possibly performing irreversible actions.
+   *
+   * @param taskId The task to attempt to schedule.
+   * @return SUCCESS if the task was scheduled, TRY_AGAIN otherwise. The caller should call
schedule
+   * again if TRY_AGAIN is returned.
+   */
+  TaskSchedulerResult schedule(String taskId);
 
-  private static final Logger LOG = Logger.getLogger(TaskScheduler.class.getName());
+  enum TaskSchedulerResult {
+    SUCCESS,
+    TRY_AGAIN
+  }
 
-  private final Storage storage;
-  private final StateManager stateManager;
-  private final TaskAssigner assigner;
-  private final OfferQueue offerQueue;
+  /**
+   * An asynchronous task scheduler.  Scheduling of tasks is performed on a delay, where
each task
+   * backs off after a failed scheduling attempt.
+   * <p>
+   * Pending tasks are advertised to the scheduler via internal pubsub notifications.
+   */
+  class TaskSchedulerImpl implements TaskScheduler {
+    /**
+     * Binding annotation for the time duration of reservations
+     */
+    @BindingAnnotation
+    @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+    @interface ReservationDuration { }
 
-  private final AtomicLong scheduleAttemptsFired = Stats.exportLong("schedule_attempts_fired");
-  private final AtomicLong scheduleAttemptsFailed = Stats.exportLong("schedule_attempts_failed");
+    private static final Logger LOG = Logger.getLogger(TaskSchedulerImpl.class.getName());
 
-  @Inject
-  TaskScheduler(
-      Storage storage,
-      StateManager stateManager,
-      TaskAssigner assigner,
-      OfferQueue offerQueue) {
+    private final Storage storage;
+    private final StateManager stateManager;
+    private final TaskAssigner assigner;
+    private final OfferQueue offerQueue;
+    private final Preemptor preemptor;
+    private final Reservations reservations;
 
-    this.storage = checkNotNull(storage);
-    this.stateManager = checkNotNull(stateManager);
-    this.assigner = checkNotNull(assigner);
-    this.offerQueue = checkNotNull(offerQueue);
-  }
+    private final AtomicLong scheduleAttemptsFired = Stats.exportLong("schedule_attempts_fired");
+    private final AtomicLong scheduleAttemptsFailed = Stats.exportLong("schedule_attempts_failed");
+
+    @Inject
+    TaskSchedulerImpl(
+        Storage storage,
+        StateManager stateManager,
+        TaskAssigner assigner,
+        OfferQueue offerQueue,
+        Preemptor preemptor,
+        @ReservationDuration Amount<Long, Time> reservationDuration,
+        final Clock clock) {
+
+      this.storage = checkNotNull(storage);
+      this.stateManager = checkNotNull(stateManager);
+      this.assigner = checkNotNull(assigner);
+      this.offerQueue = checkNotNull(offerQueue);
+      this.preemptor = checkNotNull(preemptor);
+      this.reservations = new Reservations(reservationDuration, clock);
+    }
+
+    private Function<Offer, Optional<TaskInfo>> getAssignerFunction(
+        final String taskId,
+        final IScheduledTask task) {
 
-  @VisibleForTesting
-  static final Optional<String> LAUNCH_FAILED_MSG =
-      Optional.of("Unknown exception attempting to schedule task.");
-
-  @Timed("task_schedule_attempt")
-  @Override
-  public boolean schedule(final String taskId) {
-    scheduleAttemptsFired.incrementAndGet();
-    try {
-      return storage.write(new MutateWork.Quiet<Boolean>() {
-        @Override public Boolean apply(MutableStoreProvider store) {
-          LOG.fine("Attempting to schedule task " + taskId);
-          Query.Builder pendingTaskQuery = Query.taskScoped(taskId).byStatus(PENDING);
-          final IScheduledTask task =
-              Iterables.getOnlyElement(store.getTaskStore().fetchTasks(pendingTaskQuery),
null);
-          if (task == null) {
-            LOG.warning("Failed to look up task " + taskId + ", it may have been deleted.");
+      return new Function<Offer, Optional<TaskInfo>>() {
+        @Override public Optional<TaskInfo> apply(Offer offer) {
+          Optional<String> reservedTaskId = reservations.getSlaveReservation(offer.getSlaveId());
+          if (reservedTaskId.isPresent()) {
+            if (taskId.equals(reservedTaskId.get())) {
+              // Slave is reserved to satisfy this task.
+              return assigner.maybeAssign(offer, task);
+            } else {
+              // Slave is reserved for another task.
+              return Optional.absent();
+            }
           } else {
-            Function<Offer, Optional<TaskInfo>> assignment =
-                new Function<Offer, Optional<TaskInfo>>() {
-                  @Override public Optional<TaskInfo> apply(Offer offer) {
-                    return assigner.maybeAssign(offer, task);
-                  }
-                };
-            try {
-              if (!offerQueue.launchFirst(assignment)) {
-                // Task could not be scheduled.
-                return false;
+            // Slave is not reserved.
+            return assigner.maybeAssign(offer, task);
+          }
+        }
+      };
+    }
+
+    @VisibleForTesting
+    static final Optional<String> LAUNCH_FAILED_MSG =
+        Optional.of("Unknown exception attempting to schedule task.");
+
+    @Timed("task_schedule_attempt")
+    @Override
+    public TaskSchedulerResult schedule(final String taskId) {
+      scheduleAttemptsFired.incrementAndGet();
+      try {
+        return storage.write(new MutateWork.Quiet<TaskSchedulerResult>() {
+          @Override public TaskSchedulerResult apply(MutableStoreProvider store) {
+            LOG.fine("Attempting to schedule task " + taskId);
+            Query.Builder pendingTaskQuery = Query.taskScoped(taskId).byStatus(PENDING);
+            final IScheduledTask task =
+                Iterables.getOnlyElement(store.getTaskStore().fetchTasks(pendingTaskQuery),
null);
+            if (task == null) {
+              LOG.warning("Failed to look up task " + taskId + ", it may have been deleted.");
+            } else {
+              try {
+                if (!offerQueue.launchFirst(getAssignerFunction(taskId, task))) {
+                  // Task could not be scheduled.
+                  maybePreemptFor(taskId);
+                  return TaskSchedulerResult.TRY_AGAIN;
+                }
+              } catch (OfferQueue.LaunchException e) {
+                LOG.log(Level.WARNING, "Failed to launch task.", e);
+                scheduleAttemptsFailed.incrementAndGet();
+
+                // The attempt to schedule the task failed, so we need to backpedal on the
+                // assignment.
+                // It is in the LOST state and a new task will move to PENDING to replace
it.
+                // Should the state change fail due to storage issues, that's okay.  The
task will
+                // time out in the ASSIGNED state and be moved to LOST.
+                stateManager.changeState(pendingTaskQuery, LOST, LAUNCH_FAILED_MSG);
               }
-            } catch (OfferQueue.LaunchException e) {
-              LOG.log(Level.WARNING, "Failed to launch task.", e);
-              scheduleAttemptsFailed.incrementAndGet();
-
-              // The attempt to schedule the task failed, so we need to backpedal on the
assignment.
-              // It is in the LOST state and a new task will move to PENDING to replace it.
-              // Should the state change fail due to storage issues, that's okay.  The task
will
-              // time out in the ASSIGNED state and be moved to LOST.
-              stateManager.changeState(pendingTaskQuery, LOST, LAUNCH_FAILED_MSG);
             }
+
+            return TaskSchedulerResult.SUCCESS;
           }
+        });
+      } catch (RuntimeException e) {
+        // We catch the generic unchecked exception here to ensure tasks are not abandoned
+        // if there is a transient issue resulting in an unchecked exception.
+        LOG.log(Level.WARNING, "Task scheduling unexpectedly failed, will be retried", e);
+        scheduleAttemptsFailed.incrementAndGet();
+        return TaskSchedulerResult.TRY_AGAIN;
+      }
+    }
 
-          return true;
-        }
-      });
-    } catch (RuntimeException e) {
-      // We catch the generic unchecked exception here to ensure tasks are not abandoned
-      // if there is a transient issue resulting in an unchecked exception.
-      LOG.log(Level.WARNING, "Task scheduling unexpectedly failed, will be retried", e);
-      scheduleAttemptsFailed.incrementAndGet();
-      return false;
+    private void maybePreemptFor(String taskId) {
+      if (reservations.hasReservationForTask(taskId)) {
+        return;
+      }
+      Optional<String> slaveId = preemptor.findPreemptionSlotFor(taskId);
+      if (slaveId.isPresent()) {
+        this.reservations.add(SlaveID.newBuilder().setValue(slaveId.get()).build(), taskId);
+      }
+    }
+
+    @Subscribe
+    public void taskChanged(final TaskStateChange stateChangeEvent) {
+      if (stateChangeEvent.getOldState() == PENDING) {
+        reservations.invalidateTask(stateChangeEvent.getTaskId());
+      }
+    }
+
+    private static class Reservations {
+      private final Cache<SlaveID, String> reservations;
+
+      Reservations(final Amount<Long, Time> duration, final Clock clock) {
+        checkNotNull(duration);
+        checkNotNull(clock);
+        this.reservations = CacheBuilder.newBuilder()
+            .expireAfterWrite(duration.as(Time.MINUTES), TimeUnit.MINUTES)
+            .ticker(new Ticker() {
+              @Override public long read() {
+                return clock.nowNanos();
+              }
+            })
+            .build();
+        Stats.export(new StatImpl<Long>("reservation_cache_size") {
+          @Override public Long read() {
+            return reservations.size();
+          }
+        });
+      }
+
+      private synchronized void add(SlaveID slaveId, String taskId) {
+        reservations.put(slaveId, taskId);
+      }
+
+      private synchronized boolean hasReservationForTask(String taskId) {
+        return reservations.asMap().containsValue(taskId);
+      }
+
+      private synchronized Optional<String> getSlaveReservation(SlaveID slaveID) {
+        return Optional.fromNullable(reservations.getIfPresent(slaveID));
+      }
+
+      private synchronized void invalidateTask(String taskId) {
+        reservations.asMap().values().remove(taskId);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3f65a46b/src/test/java/com/twitter/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/twitter/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/com/twitter/aurora/scheduler/async/TaskSchedulerImplTest.java
new file mode 100644
index 0000000..4bdf0ff
--- /dev/null
+++ b/src/test/java/com/twitter/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -0,0 +1,278 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.async;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.TypeLiteral;
+
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.TaskInfo;
+import org.easymock.Capture;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.twitter.aurora.gen.AssignedTask;
+import com.twitter.aurora.gen.Identity;
+import com.twitter.aurora.gen.ScheduledTask;
+import com.twitter.aurora.gen.TaskConfig;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.events.PubsubEvent;
+import com.twitter.aurora.scheduler.state.PubsubTestUtil;
+import com.twitter.aurora.scheduler.state.StateManager;
+import com.twitter.aurora.scheduler.state.TaskAssigner;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.aurora.scheduler.storage.testing.StorageTestUtil;
+import com.twitter.common.base.Closure;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.Clock;
+import com.twitter.common.util.testing.FakeClock;
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+
+import static com.twitter.aurora.gen.ScheduleStatus.PENDING;
+
+public class TaskSchedulerImplTest extends EasyMockTest {
+
+  private StorageTestUtil storageUtil;
+  private StateManager stateManager;
+  private TaskAssigner assigner;
+  private OfferQueue offerQueue;
+  private TaskScheduler scheduler;
+  private FakeClock clock;
+  private Preemptor preemptor;
+  private Amount<Long, Time> reservationDuration;
+  private Amount<Long, Time> halfReservationDuration;
+  private Closure<PubsubEvent> eventSink;
+
+  @Before
+  public void setUp() throws Exception {
+    storageUtil = new StorageTestUtil(this);
+    stateManager = createMock(StateManager.class);
+    assigner = createMock(TaskAssigner.class);
+    offerQueue = createMock(OfferQueue.class);
+    reservationDuration = Amount.of(2L, Time.MINUTES);
+    halfReservationDuration = Amount.of(1L, Time.MINUTES);
+    clock = new FakeClock();
+    clock.setNowMillis(0);
+    preemptor = createMock(Preemptor.class);
+
+    Injector injector = Guice.createInjector(new AbstractModule() {
+      @Override protected void configure() {
+        PubsubTestUtil.installPubsub(binder());
+        bind(AsyncModule.PREEMPTOR_KEY).toInstance(preemptor);
+        AsyncModule.bindTaskScheduler(binder(), AsyncModule.PREEMPTOR_KEY, reservationDuration);
+        bind(OfferQueue.class).toInstance(offerQueue);
+        bind(StateManager.class).toInstance(stateManager);
+        bind(TaskAssigner.class).toInstance(assigner);
+        bind(Clock.class).toInstance(clock);
+        bind(Storage.class).toInstance(storageUtil.storage);
+      }
+    });
+
+    scheduler = injector.getInstance(TaskScheduler.class);
+    eventSink = PubsubTestUtil.startPubsub(injector);
+  }
+
+  @Test
+  public void testReservationsDeniesTasksForTimePeriod() throws OfferQueue.LaunchException
{
+    IScheduledTask taskA = makeTask("a");
+    IScheduledTask taskB = makeTask("b");
+    Offer offerA = Offers.makeOffer("OFFER_A", "HOST_A");
+
+    storageUtil.expectOperations();
+
+    storageUtil.expectTaskFetch(Query.taskScoped("a").byStatus(PENDING), ImmutableSet.of(taskA));
+    expectLaunchAttempt(false);
+    // Reserve "a" with offerA
+    expect(preemptor.findPreemptionSlotFor("a"))
+        .andReturn(Optional.of(offerA.getSlaveId().getValue()));
+
+    storageUtil.expectTaskFetch(Query.taskScoped("b").byStatus(PENDING), ImmutableSet.of(taskB));
+    Capture<Function<Offer, Optional<TaskInfo>>> firstAssignment = expectLaunchAttempt(false);
+    expect(preemptor.findPreemptionSlotFor("b")).andReturn(Optional.<String>absent());
+
+    storageUtil.expectTaskFetch(Query.taskScoped("b").byStatus(PENDING), ImmutableSet.of(taskB));
+    Capture<Function<Offer, Optional<TaskInfo>>> secondAssignment = expectLaunchAttempt(true);
+
+    expect(assigner.maybeAssign(offerA, taskB))
+        .andReturn(Optional.<TaskInfo>of(TaskInfo.getDefaultInstance()));
+
+    control.replay();
+
+    assertEquals(scheduler.schedule("a"), TaskScheduler.TaskSchedulerResult.TRY_AGAIN);
+    assertEquals(scheduler.schedule("b"), TaskScheduler.TaskSchedulerResult.TRY_AGAIN);
+
+    assertEquals(Optional.<TaskInfo>absent(), firstAssignment.getValue().apply(offerA));
+
+    clock.advance(reservationDuration);
+
+    assertEquals(scheduler.schedule("b"), TaskScheduler.TaskSchedulerResult.SUCCESS);
+
+    assertEquals(secondAssignment.getValue().apply(offerA).isPresent(), true);
+  }
+
+  @Test
+  public void testReservationsExpireAfterAccepted() throws OfferQueue.LaunchException {
+    IScheduledTask taskA = makeTask("a");
+    IScheduledTask taskB = makeTask("b");
+    Offer offerA = Offers.makeOffer("OFFER_A", "HOST_A");
+
+    storageUtil.expectOperations();
+
+    storageUtil.expectTaskFetch(Query.taskScoped("a").byStatus(PENDING), ImmutableSet.of(taskA));
+    expectLaunchAttempt(false);
+    // Reserve "a" with offerA
+    expect(preemptor.findPreemptionSlotFor("a"))
+        .andReturn(Optional.of(offerA.getSlaveId().getValue()));
+
+    storageUtil.expectTaskFetch(Query.taskScoped("a").byStatus(PENDING), ImmutableSet.of(taskA));
+
+    Capture<Function<Offer, Optional<TaskInfo>>> firstAssignment = expectLaunchAttempt(true);
+
+    expect(assigner.maybeAssign(offerA, taskA))
+        .andReturn(Optional.<TaskInfo>of(TaskInfo.getDefaultInstance()));
+
+    storageUtil.expectTaskFetch(Query.taskScoped("b").byStatus(PENDING), ImmutableSet.of(taskB));
+
+    Capture<Function<Offer, Optional<TaskInfo>>> secondAssignment = expectLaunchAttempt(true);
+
+    expect(assigner.maybeAssign(offerA, taskB))
+        .andReturn(Optional.<TaskInfo>of(TaskInfo.getDefaultInstance()));
+
+    control.replay();
+    assertEquals(scheduler.schedule("a"), TaskScheduler.TaskSchedulerResult.TRY_AGAIN);
+    assertEquals(scheduler.schedule("a"), TaskScheduler.TaskSchedulerResult.SUCCESS);
+    firstAssignment.getValue().apply(offerA);
+    eventSink.execute(new PubsubEvent.TaskStateChange(taskA, PENDING));
+    clock.advance(halfReservationDuration);
+    assertEquals(scheduler.schedule("b"), TaskScheduler.TaskSchedulerResult.SUCCESS);
+    secondAssignment.getValue().apply(offerA);
+  }
+
+  @Test
+  public void testReservationsAcceptsWithInTimePeriod() throws OfferQueue.LaunchException
{
+    IScheduledTask taskA = makeTask("a");
+    Offer offerA = Offers.makeOffer("OFFER_A", "HOST_A");
+
+    storageUtil.expectOperations();
+    storageUtil.expectTaskFetch(Query.taskScoped("a").byStatus(PENDING), ImmutableSet.of(taskA));
+    expectLaunchAttempt(false);
+    // Reserve "a" with offerA
+    expect(preemptor.findPreemptionSlotFor("a"))
+        .andReturn(Optional.of(offerA.getSlaveId().getValue()));
+
+    storageUtil.expectTaskFetch(Query.taskScoped("a").byStatus(PENDING), ImmutableSet.of(taskA));
+
+    Capture<Function<Offer, Optional<TaskInfo>>> firstAssignment = expectLaunchAttempt(true);
+
+    expect(assigner.maybeAssign(offerA, taskA))
+        .andReturn(Optional.<TaskInfo>of(TaskInfo.getDefaultInstance()));
+
+    control.replay();
+    assertEquals(scheduler.schedule("a"), TaskScheduler.TaskSchedulerResult.TRY_AGAIN);
+    clock.advance(halfReservationDuration);
+    assertEquals(scheduler.schedule("a"), TaskScheduler.TaskSchedulerResult.SUCCESS);
+
+    firstAssignment.getValue().apply(offerA);
+  }
+
+  @Test
+  public void testReservationsCancellation() throws OfferQueue.LaunchException {
+    IScheduledTask taskA = makeTask("a");
+    IScheduledTask taskB = makeTask("b");
+    Offer offerA = Offers.makeOffer("OFFER_A", "HOST_A");
+
+    storageUtil.expectOperations();
+
+    storageUtil.expectTaskFetch(Query.taskScoped("a").byStatus(PENDING), ImmutableSet.of(taskA));
+    expectLaunchAttempt(false);
+
+    // Reserve "a" with offerA
+    expect(preemptor.findPreemptionSlotFor("a"))
+        .andReturn(Optional.of(offerA.getSlaveId().getValue()));
+
+    storageUtil.expectTaskFetch(Query.taskScoped("b").byStatus(PENDING), ImmutableSet.of(taskB));
+
+    Capture<Function<Offer, Optional<TaskInfo>>> assignment = expectLaunchAttempt(true);
+
+    expect(assigner.maybeAssign(offerA, taskB))
+        .andReturn(Optional.<TaskInfo>of(TaskInfo.getDefaultInstance()));
+
+    control.replay();
+    assertEquals(scheduler.schedule("a"), TaskScheduler.TaskSchedulerResult.TRY_AGAIN);
+    clock.advance(halfReservationDuration);
+    // Task is killed by user before it is scheduled
+    eventSink.execute(new PubsubEvent.TaskStateChange(taskA, PENDING));
+    assertEquals(scheduler.schedule("b"), TaskScheduler.TaskSchedulerResult.SUCCESS);
+    assignment.getValue().apply(offerA);
+  }
+
+  @Test
+  public void testReservationsExpire() throws OfferQueue.LaunchException {
+    IScheduledTask taskA = makeTask("a");
+    IScheduledTask taskB = makeTask("b");
+    Offer offer1 = Offers.makeOffer("OFFER_1", "HOST_A");
+
+    storageUtil.expectOperations();
+
+    storageUtil.expectTaskFetch(Query.taskScoped("b").byStatus(PENDING), ImmutableSet.of(taskB));
+    expectLaunchAttempt(false);
+    // Reserve "b" with offer1
+    expect(preemptor.findPreemptionSlotFor("b"))
+        .andReturn(Optional.of(offer1.getSlaveId().getValue()));
+
+    storageUtil.expectTaskFetch(Query.taskScoped("a").byStatus(PENDING), ImmutableSet.of(taskA));
+    Capture<Function<Offer, Optional<TaskInfo>>> firstAssignment = expectLaunchAttempt(true);
+
+    expect(assigner.maybeAssign(offer1, taskA))
+        .andReturn(Optional.<TaskInfo>of(TaskInfo.getDefaultInstance()));
+
+    control.replay();
+    assertEquals(scheduler.schedule("b"), TaskScheduler.TaskSchedulerResult.TRY_AGAIN);
+    // We don't act on the reservation made by b because we want to see timeout behaviour.
+    clock.advance(reservationDuration);
+    assertEquals(scheduler.schedule("a"), TaskScheduler.TaskSchedulerResult.SUCCESS);
+    firstAssignment.getValue().apply(offer1);
+  }
+
+  private IScheduledTask makeTask(String taskId) {
+    return IScheduledTask.build(new ScheduledTask()
+        .setAssignedTask(new AssignedTask()
+            .setInstanceId(0)
+            .setTaskId(taskId)
+            .setTask(new TaskConfig()
+                .setJobName("job-" + taskId)
+                .setOwner(new Identity().setRole("role-" + taskId).setUser("user-" + taskId))
+                .setEnvironment("env-" + taskId))));
+  }
+
+  private Capture<Function<Offer, Optional<TaskInfo>>> expectLaunchAttempt(boolean
taskLaunched)
+      throws OfferQueue.LaunchException {
+        Capture<Function<Offer, Optional<TaskInfo>>> assignment = createCapture();
+        expect(offerQueue.launchFirst(capture(assignment))).andReturn(taskLaunched);
+        return assignment;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3f65a46b/src/test/java/com/twitter/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/twitter/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/com/twitter/aurora/scheduler/async/TaskSchedulerTest.java
index a747f2b..4773a96 100644
--- a/src/test/java/com/twitter/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/com/twitter/aurora/scheduler/async/TaskSchedulerTest.java
@@ -48,9 +48,9 @@ import com.twitter.aurora.gen.TaskEvent;
 import com.twitter.aurora.scheduler.Driver;
 import com.twitter.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
 import com.twitter.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
-import com.twitter.aurora.scheduler.async.TaskGroups.SchedulingAction;
 import com.twitter.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl;
 import com.twitter.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl.RescheduleCalculatorSettings;
+import com.twitter.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
 import com.twitter.aurora.scheduler.base.Query;
 import com.twitter.aurora.scheduler.base.Tasks;
 import com.twitter.aurora.scheduler.events.PubsubEvent.HostMaintenanceStateChange;
@@ -91,7 +91,7 @@ import static com.twitter.aurora.gen.ScheduleStatus.RESTARTING;
 import static com.twitter.aurora.gen.ScheduleStatus.RUNNING;
 
 /**
- * TODO(wfarner): Break this test up to independently test TaskScheduler and OfferQueueImpl.
+ * TODO(wfarner): Break this test up to independently test TaskSchedulerImpl and OfferQueueImpl.
  */
 public class TaskSchedulerTest extends EasyMockTest {
 
@@ -111,9 +111,11 @@ public class TaskSchedulerTest extends EasyMockTest {
   private OfferReturnDelay returnDelay;
   private OfferQueue offerQueue;
   private TaskGroups taskGroups;
+  private TaskScheduler scheduler;
   private FakeClock clock;
   private BackoffStrategy flappingStrategy;
   private Preemptor preemptor;
+  private Amount<Long, Time> reservationDuration = Amount.of(1L, Time.MINUTES);
 
   @Before
   public void setUp() {
@@ -137,8 +139,13 @@ public class TaskSchedulerTest extends EasyMockTest {
     offerQueue = new OfferQueueImpl(driver, returnDelay, executor, maintenance);
     RateLimiter rateLimiter = RateLimiter.create(1);
     Amount<Long, Time> flappingThreshold = Amount.of(5L, Time.MINUTES);
-    SchedulingAction scheduler =
-        new TaskScheduler(storage, stateManager, assigner, offerQueue);
+    scheduler = new TaskSchedulerImpl(storage,
+        stateManager,
+        assigner,
+        offerQueue,
+        preemptor,
+        reservationDuration,
+        clock);
     taskGroups = new TaskGroups(
         executor,
         storage,
@@ -152,8 +159,7 @@ public class TaskSchedulerTest extends EasyMockTest {
                 flappingStrategy,
                 flappingThreshold,
                 Amount.of(5, Time.SECONDS)),
-            clock),
-        preemptor);
+            clock));
   }
 
   private Capture<Runnable> expectOffer() {
@@ -324,7 +330,7 @@ public class TaskSchedulerTest extends EasyMockTest {
     expect(stateManager.changeState(
         Query.taskScoped("a").byStatus(PENDING),
         LOST,
-        TaskScheduler.LAUNCH_FAILED_MSG))
+        TaskSchedulerImpl.LAUNCH_FAILED_MSG))
         .andReturn(1);
 
     replayAndCreateScheduler();
@@ -349,7 +355,6 @@ public class TaskSchedulerTest extends EasyMockTest {
     expect(assigner.maybeAssign(OFFER_A, task)).andThrow(new StorageException("Injected failure."));
 
     Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(10, 20);
-    expect(preemptor.findPreemptionSlotFor("a")).andReturn(Optional.<String>absent());
     expect(assigner.maybeAssign(OFFER_A, task)).andReturn(Optional.of(mesosTask));
     driver.launchTask(OFFER_A.getId(), mesosTask);
     expectLastCall();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3f65a46b/src/test/java/com/twitter/aurora/scheduler/state/PubsubTestUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/twitter/aurora/scheduler/state/PubsubTestUtil.java b/src/test/java/com/twitter/aurora/scheduler/state/PubsubTestUtil.java
index f9d7fb6..25a822d 100644
--- a/src/test/java/com/twitter/aurora/scheduler/state/PubsubTestUtil.java
+++ b/src/test/java/com/twitter/aurora/scheduler/state/PubsubTestUtil.java
@@ -32,7 +32,7 @@ import com.twitter.common.base.ExceptionalCommand;
  * A convenience utility for unit tests that which to verify pubsub wiring.
  * TODO(wfarner): Clean this up - make it integrate more cleanly with callers and LifecycleModule.
  */
-final class PubsubTestUtil {
+public final class PubsubTestUtil {
 
   private PubsubTestUtil() {
     // Utility class.
@@ -43,7 +43,7 @@ final class PubsubTestUtil {
    *
    * @param binder Binder to install pubsub system onto.
    */
-  static void installPubsub(Binder binder) {
+  public static void installPubsub(Binder binder) {
     PubsubEventModule.installForTest(binder);
   }
 
@@ -54,7 +54,7 @@ final class PubsubTestUtil {
    * @return The pubsub event sink.
    * @throws Exception If the pubsub system failed to start.
    */
-  static Closure<PubsubEvent> startPubsub(Injector injector) throws Exception {
+  public static Closure<PubsubEvent> startPubsub(Injector injector) throws Exception
{
     // TODO(wfarner): Make it easier to write a unit test wired for pubsub events.
     // In this case, a trade-off was made to avoid installing several distant modules and
providing
     // required bindings that seem unrelated from this code.


Mime
View raw message