aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject git commit: Work in progress, needs to be split into several reviews.
Date Tue, 04 Feb 2014 22:35:52 GMT
Updated Branches:
  refs/heads/wfarner/async_gc_executor_launcher [created] e68fa7b5e


Work in progress, needs to be split into several reviews.


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/e68fa7b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/e68fa7b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/e68fa7b5

Branch: refs/heads/wfarner/async_gc_executor_launcher
Commit: e68fa7b5e565eeb7e5c02099c090020cc7b14c95
Parents: 80106ee
Author: Bill Farner <wfarner@apache.org>
Authored: Tue Jan 28 02:11:51 2014 -0800
Committer: Bill Farner <wfarner@apache.org>
Committed: Tue Feb 4 14:34:43 2014 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/MesosSchedulerImpl.java    |  28 +--
 .../apache/aurora/scheduler/TaskLauncher.java   |   7 +-
 .../aurora/scheduler/UserTaskLauncher.java      |   4 +-
 .../aurora/scheduler/async/HistoryPruner.java   | 103 +++++-----
 .../aurora/scheduler/async/MappedFutures.java   |  86 +++++++++
 .../aurora/scheduler/async/TaskTimeout.java     | 193 +++----------------
 .../scheduler/periodic/GcExecutorLauncher.java  |  67 +++++--
 .../scheduler/MesosSchedulerImplTest.java       |  22 +--
 .../aurora/scheduler/UserTaskLauncherTest.java  |   2 +-
 .../periodic/GcExecutorLauncherTest.java        |   8 +-
 10 files changed, 236 insertions(+), 284 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e68fa7b5/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
index 2f84bdf..ed0fc0a 100644
--- a/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
@@ -22,9 +22,7 @@ import java.util.logging.Logger;
 
 import javax.inject.Inject;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 import com.twitter.common.application.Lifecycle;
 import com.twitter.common.inject.TimedInterceptor.Timed;
 import com.twitter.common.stats.Stats;
@@ -34,7 +32,6 @@ import org.apache.aurora.codec.ThriftBinaryCodec;
 import org.apache.aurora.gen.comm.SchedulerMessage;
 import org.apache.aurora.scheduler.base.Conversions;
 import org.apache.aurora.scheduler.base.SchedulerException;
-import org.apache.aurora.scheduler.configuration.Resources;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
@@ -48,7 +45,6 @@ import org.apache.mesos.Protos.MasterInfo;
 import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.SlaveID;
-import org.apache.mesos.Protos.TaskInfo;
 import org.apache.mesos.Protos.TaskStatus;
 import org.apache.mesos.Scheduler;
 import org.apache.mesos.SchedulerDriver;
@@ -62,7 +58,6 @@ class MesosSchedulerImpl implements Scheduler {
   private static final Logger LOG = Logger.getLogger(MesosSchedulerImpl.class.getName());
 
   private final AtomicLong resourceOffers = Stats.exportLong("scheduler_resource_offers");
-  private final AtomicLong failedOffers = Stats.exportLong("scheduler_failed_offers");
   private final AtomicLong failedStatusUpdates = Stats.exportLong("scheduler_status_updates");
   private final AtomicLong frameworkDisconnects =
       Stats.exportLong("scheduler_framework_disconnects");
@@ -135,10 +130,6 @@ class MesosSchedulerImpl implements Scheduler {
     frameworkReregisters.incrementAndGet();
   }
 
-  private static boolean fitsInOffer(TaskInfo task, Offer offer) {
-    return Resources.from(offer).greaterThanOrEqual(Resources.from(task.getResourcesList()));
-  }
-
   @Timed("scheduler_resource_offers")
   @Override
   public void resourceOffers(SchedulerDriver driver, final List<Offer> offers) {
@@ -164,24 +155,9 @@ class MesosSchedulerImpl implements Scheduler {
       resourceOffers.incrementAndGet();
 
       // Ordering of task launchers is important here, since offers are consumed greedily.
-      // TODO(William Farner): Refactor this area of code now that the primary task launcher
-      // is asynchronous.
       for (TaskLauncher launcher : taskLaunchers) {
-        Optional<TaskInfo> task = Optional.absent();
-        try {
-          task = launcher.createTask(offer);
-        } catch (SchedulerException e) {
-          LOG.log(Level.WARNING, "Failed to schedule offers.", e);
-          failedOffers.incrementAndGet();
-        }
-
-        if (task.isPresent()) {
-          if (fitsInOffer(task.get(), offer)) {
-            driver.launchTasks(offer.getId(), ImmutableList.of(task.get()));
-            break;
-          } else {
-            LOG.warning("Insufficient resources to launch task " + task);
-          }
+        if (launcher.willUse(offer)) {
+          break;
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e68fa7b5/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java b/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java
index 96a3ade..3002efc 100644
--- a/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java
+++ b/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java
@@ -28,16 +28,17 @@ import org.apache.mesos.Protos.TaskStatus;
 public interface TaskLauncher {
 
   /**
-   * Grants a resource offer to the task launcher, which will be passed to any subsequent
task
+   * Presents a resource offer to the task launcher, which will be passed to any subsequent
task
    * launchers if this one does not accept.
    * <p>
    * A task launcher may choose to retain an offer for later use.  Any retained offers must
be
    * cleaned up with {@link #cancelOffer(OfferID)}.
    *
    * @param offer The resource offer.
-   * @return A task, absent if the launcher chooses not to accept the offer.
+   * @return {@code false} if the launcher will not act on the offer, or {@code true} if
the
+   *         launcher may accept the offer at some point in the future.
    */
-  Optional<TaskInfo> createTask(Offer offer);
+  boolean willUse(Offer offer);
 
   /**
    * Informs the launcher that a status update has been received for a task.  If the task
is not

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e68fa7b5/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
index 010776e..fc3b1ad 100644
--- a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
+++ b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
@@ -59,11 +59,11 @@ class UserTaskLauncher implements TaskLauncher {
   }
 
   @Override
-  public Optional<TaskInfo> createTask(Offer offer) {
+  public boolean willUse(Offer offer) {
     checkNotNull(offer);
 
     offerQueue.addOffer(offer);
-    return Optional.absent();
+    return true;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e68fa7b5/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
index 9f77bfc..c43bdbb 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
@@ -18,10 +18,8 @@ package org.apache.aurora.scheduler.async;
 import java.lang.annotation.Retention;
 import java.lang.annotation.Target;
 import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Future;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.logging.Logger;
@@ -29,20 +27,21 @@ import java.util.logging.Logger;
 import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.LinkedHashMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
 import com.google.common.eventbus.Subscribe;
 import com.google.inject.BindingAnnotation;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.util.Clock;
 
+import org.apache.aurora.gen.apiConstants;
+import org.apache.aurora.scheduler.async.MappedFutures.ScheduledMappedFutures;
+import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 
@@ -64,19 +63,13 @@ import static org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
 public class HistoryPruner implements EventSubscriber {
   private static final Logger LOG = Logger.getLogger(HistoryPruner.class.getName());
 
-  private final Multimap<IJobKey, String> tasksByJob =
-      Multimaps.synchronizedSetMultimap(LinkedHashMultimap.<IJobKey, String>create());
-  @VisibleForTesting
-  Multimap<IJobKey, String> getTasksByJob() {
-    return tasksByJob;
-  }
-
-  private final ScheduledExecutorService executor;
   private final StateManager stateManager;
   private final Clock clock;
   private final long pruneThresholdMillis;
   private final int perJobHistoryGoal;
-  private final Map<String, Future<?>> taskIdToFuture = Maps.newConcurrentMap();
+  private final Storage storage;
+  private final ExecutorService executor;
+  private final ScheduledMappedFutures<String> pendingDeletes;
 
   @BindingAnnotation
   @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
@@ -88,13 +81,16 @@ public class HistoryPruner implements EventSubscriber {
       final StateManager stateManager,
       final Clock clock,
       @PruneThreshold Amount<Long, Time> inactivePruneThreshold,
-      @PruneThreshold int perJobHistoryGoal) {
+      @PruneThreshold int perJobHistoryGoal,
+      Storage storage) {
 
-    this.executor = checkNotNull(executor);
     this.stateManager = checkNotNull(stateManager);
     this.clock = checkNotNull(clock);
     this.pruneThresholdMillis = inactivePruneThreshold.as(Time.MILLISECONDS);
     this.perJobHistoryGoal = perJobHistoryGoal;
+    this.storage = checkNotNull(storage);
+    this.executor = checkNotNull(executor);
+    this.pendingDeletes = new ScheduledMappedFutures<>(executor);
   }
 
   @VisibleForTesting
@@ -132,14 +128,7 @@ public class HistoryPruner implements EventSubscriber {
    */
   @Subscribe
   public void tasksDeleted(final TasksDeleted event) {
-    for (IScheduledTask task : event.getTasks()) {
-      String id = Tasks.id(task);
-      tasksByJob.remove(Tasks.SCHEDULED_TO_JOB_KEY.apply(task), id);
-      Future<?> future = taskIdToFuture.remove(id);
-      if (future != null) {
-        future.cancel(false);
-      }
-    }
+    pendingDeletes.cancel(Tasks.ids(event.getTasks()));
   }
 
   private void registerInactiveTask(
@@ -149,39 +138,39 @@ public class HistoryPruner implements EventSubscriber {
 
     LOG.fine("Prune task " + taskId + " in " + timeRemaining + " ms.");
     // Insert the latest inactive task at the tail.
-    tasksByJob.put(jobKey, taskId);
-    Runnable runnable = new Runnable() {
+
+    // TODO(wfarner): Consider reworking this class to not cache tasksByJob, and map scheduled
+    // futures by job key.  Upon the task being triggered, it will prune the job down to
the
+    // appropriate history size.
+    pendingDeletes.schedule(
+        taskId,
+        new Runnable() {
+          @Override public void run() {
+            LOG.info("Pruning expired inactive task " + taskId);
+            deleteTasks(ImmutableSet.of(taskId));
+          }
+        },
+        timeRemaining,
+        TimeUnit.MILLISECONDS);
+
+    executor.submit(new Runnable() {
       @Override public void run() {
-        LOG.info("Pruning expired inactive task " + taskId);
-        tasksByJob.remove(jobKey, taskId);
-        taskIdToFuture.remove(taskId);
-        deleteTasks(ImmutableSet.of(taskId));
-      }
-    };
-    taskIdToFuture.put(taskId, executor.schedule(runnable, timeRemaining, TimeUnit.MILLISECONDS));
-
-    ImmutableSet.Builder<String> pruneTaskIds = ImmutableSet.builder();
-    Collection<String> tasks = tasksByJob.get(jobKey);
-    // From Multimaps javadoc: "It is imperative that the user manually synchronize on the
returned
-    // multimap when accessing any of its collection views".
-    synchronized (tasksByJob) {
-      Iterator<String> iterator = tasks.iterator();
-      while (tasks.size() > perJobHistoryGoal) {
-        // Pick oldest task from the head. Guaranteed by LinkedHashMultimap based on insertion
-        // order.
-        String id = iterator.next();
-        iterator.remove();
-        pruneTaskIds.add(id);
-        Future<?> future = taskIdToFuture.remove(id);
-        if (future != null) {
-          future.cancel(false);
+        Collection<IScheduledTask> inactiveTasks = Storage.Util.weaklyConsistentFetchTasks(
+            storage,
+            Query.jobScoped(jobKey).byStatus(apiConstants.TERMINAL_STATES));
+        int tasksToPrune = inactiveTasks.size() - perJobHistoryGoal;
+        if (tasksToPrune > 0) {
+          if (inactiveTasks.size() > perJobHistoryGoal) {
+            Set<String> toPrune = FluentIterable
+                .from(Tasks.LATEST_ACTIVITY.reverse().sortedCopy(inactiveTasks))
+                .limit(tasksToPrune)
+                .transform(Tasks.SCHEDULED_TO_ID)
+                .toSet();
+            pendingDeletes.cancel(toPrune);
+            deleteTasks(toPrune);
+          }
         }
       }
-    }
-
-    Set<String> ids = pruneTaskIds.build();
-    if (!ids.isEmpty()) {
-      deleteTasks(ids);
-    }
+    });
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e68fa7b5/src/main/java/org/apache/aurora/scheduler/async/MappedFutures.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/MappedFutures.java b/src/main/java/org/apache/aurora/scheduler/async/MappedFutures.java
new file mode 100644
index 0000000..58eba8c
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/MappedFutures.java
@@ -0,0 +1,86 @@
+/**
+ * Copyright 2014 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+/**
+ * TODO(wfarner): Consider putting this in a different package.
+ */
+public class MappedFutures<K> {
+
+  private final Set<K> activeKeys =
+      Collections.newSetFromMap(Maps.<K, Boolean>newConcurrentMap());
+  private final ExecutorService executor;
+
+  public MappedFutures(ExecutorService executor) {
+    this.executor = Preconditions.checkNotNull(executor);
+  }
+
+  public boolean submit(K key, Runnable work) {
+    boolean submitting = add(key);
+    if (submitting) {
+      executor.submit(runOnceIfValid(key, work));
+    }
+    return submitting;
+  }
+
+  protected boolean add(K key) {
+    return activeKeys.add(key);
+  }
+
+  protected Runnable runOnceIfValid(final K key, final Runnable work) {
+    return new Runnable() {
+      @Override public void run() {
+        if (activeKeys.remove(key)) {
+          work.run();
+        }
+      }
+    };
+  }
+
+  public boolean cancel(K key) {
+    return activeKeys.remove(key);
+  }
+
+  public void cancel(Set<K> keys) {
+    activeKeys.removeAll(keys);
+  }
+
+  public static class ScheduledMappedFutures<K> extends MappedFutures<K> {
+    private final ScheduledExecutorService scheduledExecutor;
+
+    public ScheduledMappedFutures(ScheduledExecutorService executorService) {
+      super(executorService);
+      this.scheduledExecutor = executorService;
+    }
+
+    public boolean schedule(K key, Runnable work, long delay, TimeUnit timeUnit) {
+      boolean submitting = add(key);
+      if (submitting) {
+        scheduledExecutor.schedule(runOnceIfValid(key, work), delay, timeUnit);
+      }
+      return submitting;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e68fa7b5/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
index 64a1941..6956ace 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
@@ -16,9 +16,7 @@
 package org.apache.aurora.scheduler.async;
 
 import java.util.EnumSet;
-import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -27,21 +25,15 @@ import java.util.logging.Logger;
 import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Objects;
 import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Ordering;
 import com.google.common.eventbus.Subscribe;
+import com.twitter.common.collections.Pair;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.stats.StatsProvider;
-import com.twitter.common.util.Clock;
 
 import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.async.MappedFutures.ScheduledMappedFutures;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.state.StateManager;
@@ -59,9 +51,6 @@ class TaskTimeout implements EventSubscriber {
   static final String TIMED_OUT_TASKS_COUNTER = "timed_out_tasks";
 
   @VisibleForTesting
-  static final String TRANSIENT_COUNT_STAT_NAME = "transient_states";
-
-  @VisibleForTesting
   static final Optional<String> TIMEOUT_MESSAGE = Optional.of("Task timed out");
 
   @VisibleForTesting
@@ -71,76 +60,22 @@ class TaskTimeout implements EventSubscriber {
       ScheduleStatus.RESTARTING,
       ScheduleStatus.KILLING);
 
-  private final Map<TimeoutKey, Context> futures = Maps.newConcurrentMap();
-
-  private static final class TimeoutKey {
-    private final String taskId;
-    private final ScheduleStatus status;
-
-    private TimeoutKey(String taskId, ScheduleStatus status) {
-      this.taskId = taskId;
-      this.status = status;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(taskId, status);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (!(o instanceof TimeoutKey)) {
-        return false;
-      }
-      TimeoutKey key = (TimeoutKey) o;
-      return Objects.equal(taskId, key.taskId)
-          && (status == key.status);
-    }
-
-    @Override
-    public String toString() {
-      return taskId + ":" + status;
-    }
-  }
-
-  private final ScheduledExecutorService executor;
+  private final ScheduledMappedFutures<Pair<String, ScheduleStatus>> timeouts;
   private final StateManager stateManager;
   private final long timeoutMillis;
-  private final Clock clock;
   private final AtomicLong timedOutTasks;
 
   @Inject
   TaskTimeout(
       ScheduledExecutorService executor,
       StateManager stateManager,
-      final Clock clock,
       Amount<Long, Time> timeout,
       StatsProvider statsProvider) {
 
-    this.executor = checkNotNull(executor);
+    this.timeouts = new ScheduledMappedFutures<>(executor);
     this.stateManager = checkNotNull(stateManager);
     this.timeoutMillis = timeout.as(Time.MILLISECONDS);
-    this.clock = checkNotNull(clock);
     this.timedOutTasks = statsProvider.makeCounter(TIMED_OUT_TASKS_COUNTER);
-
-    exportStats(statsProvider);
-  }
-
-  private void registerTimeout(TimeoutKey key) {
-    // This is an obvious check-then-act, but:
-    //   - there isn't much of a better option, given that we have to get the Future before
-    //     inserting into the map
-    //   - a key collision only happens in practice if something is wrong externally to this
class
-    //     (double event for the same state)
-    //   - the outcome is low-risk, we would wind up with a redundant Future that will eventually
-    //     no-op
-    if (!futures.containsKey(key)) {
-      Future<?> timeoutHandler = executor.schedule(
-          new TimedOutTaskHandler(key),
-          timeoutMillis,
-          TimeUnit.MILLISECONDS);
-      futures.put(key, new Context(clock.nowMillis(), timeoutHandler));
-    }
   }
 
   private static boolean isTransient(ScheduleStatus status) {
@@ -149,107 +84,43 @@ class TaskTimeout implements EventSubscriber {
 
   @Subscribe
   public void recordStateChange(TaskStateChange change) {
-    String taskId = change.getTaskId();
-    ScheduleStatus newState = change.getNewState();
+    final String taskId = change.getTaskId();
+    final ScheduleStatus newState = change.getNewState();
     if (change.isTransition() && isTransient(change.getOldState().get())) {
-      TimeoutKey oldKey = new TimeoutKey(taskId, change.getOldState().get());
-      Context context = futures.remove(oldKey);
-      if (context != null) {
-        LOG.fine("Canceling state timeout for task " + oldKey);
-        context.future.cancel(false);
-      }
+      timeouts.cancel(Pair.of(taskId, change.getOldState().get()));
     }
 
     if (isTransient(newState)) {
-      registerTimeout(new TimeoutKey(taskId, change.getNewState()));
-    }
-  }
-
-  private class TimedOutTaskHandler implements Runnable {
-    private final TimeoutKey key;
-
-    TimedOutTaskHandler(TimeoutKey key) {
-      this.key = key;
-    }
-
-    @Override public void run() {
-      Context context = futures.get(key);
-      try {
-        if (context == null) {
-          LOG.warning("Timeout context not found for " + key);
-          return;
-        }
-
-        LOG.info("Timeout reached for task " + key);
-        // This query acts as a CAS by including the state that we expect the task to be
in if the
-        // timeout is still valid.  Ideally, the future would have already been canceled,
but in the
-        // event of a state transition race, including transientState prevents an unintended
-        // task timeout.
-        // Note: This requires LOST transitions trigger Driver.killTask.
-        if (stateManager.changeState(
-            key.taskId,
-            Optional.of(key.status),
-            ScheduleStatus.LOST,
-            TIMEOUT_MESSAGE)) {
-
-          timedOutTasks.incrementAndGet();
-        } else {
-          LOG.warning("Task " + key + " does not exist, or was not in the expected state.");
-        }
-      } finally {
-        futures.remove(key);
-      }
-    }
-  }
-
-  private class Context {
-    private final long timestampMillis;
-    private final Future<?> future;
-
-    Context(long timestampMillis, Future<?> future) {
-      this.timestampMillis = timestampMillis;
-      this.future = future;
+      timeouts.schedule(
+          Pair.of(taskId, change.getNewState()),
+          new Runnable() {
+            @Override public void run() {
+              LOG.info("Timeout reached for task " + taskId + ":" + taskId);
+              // This query acts as a CAS by including the state that we expect the task
to be in if
+              // the timeout is still valid.  Ideally, the future would have already been
canceled,
+              // but in the event of a state transition race, including transientState prevents
an
+              // unintended task timeout.
+              // Note: This requires LOST transitions trigger Driver.killTask.
+              if (stateManager.changeState(
+                  taskId,
+                  Optional.of(newState),
+                  ScheduleStatus.LOST,
+                  TIMEOUT_MESSAGE)) {
+
+                timedOutTasks.incrementAndGet();
+              } else {
+                LOG.warning(
+                    "Task " + taskId + " does not exist, or was not in the expected state.");
+              }
+            }
+          },
+          timeoutMillis,
+          TimeUnit.MILLISECONDS);
     }
   }
 
-  private static final Function<Context, Long> CONTEXT_TIMESTAMP = new Function<Context,
Long>() {
-    @Override public Long apply(Context context) {
-      return context.timestampMillis;
-    }
-  };
-
-  private static final Ordering<Context> TIMESTAMP_ORDER =
-      Ordering.natural().onResultOf(CONTEXT_TIMESTAMP);
-
   @VisibleForTesting
   static String waitingTimeStatName(ScheduleStatus status) {
     return "scheduler_max_" + status + "_waiting_ms";
   }
-
-  private void exportStats(StatsProvider statsProvider) {
-    statsProvider.makeGauge(TRANSIENT_COUNT_STAT_NAME, new Supplier<Number>() {
-      @Override public Number get() {
-          return futures.size();
-        }
-    });
-
-    for (final ScheduleStatus status : TRANSIENT_STATES) {
-      statsProvider.makeGauge(waitingTimeStatName(status), new Supplier<Number>() {
-        private final Predicate<TimeoutKey> statusMatcher = new Predicate<TimeoutKey>()
{
-          @Override public boolean apply(TimeoutKey key) {
-            return key.status == status;
-          }
-        };
-
-        @Override public Number get() {
-          Iterable<Context> matches = Maps.filterKeys(futures, statusMatcher).values();
-          if (Iterables.isEmpty(matches)) {
-            return 0L;
-          } else {
-            return clock.nowMillis() - TIMESTAMP_ORDER.min(matches).timestampMillis;
-          }
-        }
-      });
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e68fa7b5/src/main/java/org/apache/aurora/scheduler/periodic/GcExecutorLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/periodic/GcExecutorLauncher.java b/src/main/java/org/apache/aurora/scheduler/periodic/GcExecutorLauncher.java
index f0d4fbc..0403bf0 100644
--- a/src/main/java/org/apache/aurora/scheduler/periodic/GcExecutorLauncher.java
+++ b/src/main/java/org/apache/aurora/scheduler/periodic/GcExecutorLauncher.java
@@ -17,6 +17,7 @@ package org.apache.aurora.scheduler.periodic;
 
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Logger;
@@ -25,6 +26,7 @@ import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Maps;
@@ -40,7 +42,9 @@ import org.apache.aurora.Protobufs;
 import org.apache.aurora.codec.ThriftBinaryCodec;
 import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
 import org.apache.aurora.gen.comm.AdjustRetainedTasks;
+import org.apache.aurora.scheduler.Driver;
 import org.apache.aurora.scheduler.TaskLauncher;
+import org.apache.aurora.scheduler.async.MappedFutures;
 import org.apache.aurora.scheduler.base.CommandUtil;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
@@ -51,6 +55,7 @@ import org.apache.mesos.Protos.ExecutorID;
 import org.apache.mesos.Protos.ExecutorInfo;
 import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.SlaveID;
 import org.apache.mesos.Protos.TaskID;
 import org.apache.mesos.Protos.TaskInfo;
 import org.apache.mesos.Protos.TaskStatus;
@@ -64,8 +69,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
 public class GcExecutorLauncher implements TaskLauncher {
   private static final Logger LOG = Logger.getLogger(GcExecutorLauncher.class.getName());
 
-  private final AtomicLong tasksCreated = Stats.exportLong("scheduler_gc_tasks_created");
-
   @VisibleForTesting
   static final Resources TOTAL_GC_EXECUTOR_RESOURCES =
       new Resources(0.2, Amount.of(128L, Data.MB), Amount.of(16L, Data.MB), 0);
@@ -81,36 +84,37 @@ public class GcExecutorLauncher implements TaskLauncher {
   private static final String SYSTEM_TASK_PREFIX = "system-gc-";
   private static final String EXECUTOR_NAME = "aurora.gc";
 
+  private final AtomicLong tasksCreated = Stats.exportLong("scheduler_gc_tasks_created");
+
   private final GcExecutorSettings settings;
   private final Storage storage;
   private final Clock clock;
   private final Cache<String, Long> pulses;
+  private final Driver driver;
+  private final MappedFutures<OfferID> pendingRuns;
 
   @Inject
   GcExecutorLauncher(
       GcExecutorSettings settings,
       Storage storage,
-      Clock clock) {
+      Clock clock,
+      ExecutorService executor,
+      Driver driver) {
 
     this.settings = checkNotNull(settings);
     this.storage = checkNotNull(storage);
     this.clock = checkNotNull(clock);
+    this.driver = checkNotNull(driver);
+    this.pendingRuns = new MappedFutures<>(executor);
 
     this.pulses = CacheBuilder.newBuilder()
         .expireAfterWrite(settings.getMaxGcInterval(), TimeUnit.MILLISECONDS)
         .build();
   }
 
-  @Override
-  public Optional<TaskInfo> createTask(Offer offer) {
-    if (!settings.getGcExecutorPath().isPresent()
-        || !Resources.from(offer).greaterThanOrEqual(TOTAL_GC_EXECUTOR_RESOURCES)
-        || isAlive(offer.getHostname())) {
-      return Optional.absent();
-    }
-
+  private TaskInfo makeGcTask(String hostName, SlaveID slaveId) {
     Set<IScheduledTask> tasksOnHost =
-        Storage.Util.weaklyConsistentFetchTasks(storage, Query.slaveScoped(offer.getHostname()));
+        Storage.Util.weaklyConsistentFetchTasks(storage, Query.slaveScoped(hostName));
     AdjustRetainedTasks message = new AdjustRetainedTasks()
         .setRetainedTasks(Maps.transformValues(Tasks.mapById(tasksOnHost), Tasks.GET_STATUS));
     byte[] data;
@@ -118,26 +122,51 @@ public class GcExecutorLauncher implements TaskLauncher {
       data = ThriftBinaryCodec.encode(message);
     } catch (CodingException e) {
       LOG.severe("Failed to encode retained tasks message: " + message);
-      return Optional.absent();
+      throw Throwables.propagate(e);
     }
 
     tasksCreated.incrementAndGet();
-    pulses.put(offer.getHostname(), clock.nowMillis() + settings.getDelayMs());
 
     ExecutorInfo.Builder executor = ExecutorInfo.newBuilder()
         .setExecutorId(ExecutorID.newBuilder().setValue(EXECUTOR_NAME))
         .setName(EXECUTOR_NAME)
-        .setSource(offer.getHostname())
+        .setSource(hostName)
         .addAllResources(GC_EXECUTOR_RESOURCES.toResourceList())
         .setCommand(CommandUtil.create(settings.getGcExecutorPath().get()));
 
-    return Optional.of(TaskInfo.newBuilder().setName("system-gc")
+    return TaskInfo.newBuilder().setName("system-gc")
         .setTaskId(TaskID.newBuilder().setValue(SYSTEM_TASK_PREFIX + UUID.randomUUID().toString()))
-        .setSlaveId(offer.getSlaveId())
+        .setSlaveId(slaveId)
         .setData(ByteString.copyFrom(data))
         .setExecutor(executor)
         .addAllResources(EPSILON.toResourceList())
-        .build());
+        .build();
+  }
+
+  private static boolean fitsInOffer(TaskInfo task, Offer offer) {
+    return Resources.from(offer).greaterThanOrEqual(Resources.from(task.getResourcesList()));
+  }
+
+  @Override
+  public boolean willUse(final Offer offer) {
+    if (!settings.getGcExecutorPath().isPresent()
+        || !Resources.from(offer).greaterThanOrEqual(TOTAL_GC_EXECUTOR_RESOURCES)
+        || isAlive(offer.getHostname())) {
+
+      return false;
+    }
+
+    return pendingRuns.submit(offer.getId(), new Runnable() {
+      @Override public void run() {
+        TaskInfo task = makeGcTask(offer.getHostname(), offer.getSlaveId());
+        if (fitsInOffer(task, offer)) {
+          driver.launchTask(offer.getId(), task);
+        } else {
+          LOG.warning("Insufficient resources to launch task " + task);
+        }
+        pulses.put(offer.getHostname(), clock.nowMillis() + settings.getDelayMs());
+      }
+    });
   }
 
   @Override
@@ -152,7 +181,7 @@ public class GcExecutorLauncher implements TaskLauncher {
 
   @Override
   public void cancelOffer(OfferID offer) {
-    // No-op.
+    pendingRuns.cancel(offer);
   }
 
   private boolean isAlive(String hostname) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e68fa7b5/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java
index fd4d5d5..8e70b58 100644
--- a/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java
@@ -155,8 +155,8 @@ public class MesosSchedulerImplTest extends EasyMockTest {
     new OfferFixture() {
       @Override void respondToOffer() throws Exception {
         expectOfferAttributesSaved(OFFER);
-        expect(systemLauncher.createTask(OFFER)).andReturn(Optional.<TaskInfo>absent());
-        expect(userLauncher.createTask(OFFER)).andReturn(Optional.<TaskInfo>absent());
+        expect(systemLauncher.willUse(OFFER)).andReturn(Optional.<TaskInfo>absent());
+        expect(userLauncher.willUse(OFFER)).andReturn(Optional.<TaskInfo>absent());
       }
     }.run();
   }
@@ -166,7 +166,7 @@ public class MesosSchedulerImplTest extends EasyMockTest {
     new OfferFixture() {
       @Override void respondToOffer() throws Exception {
         expectOfferAttributesSaved(OFFER);
-        expect(systemLauncher.createTask(OFFER)).andReturn(Optional.of(TASK));
+        expect(systemLauncher.willUse(OFFER)).andReturn(Optional.of(TASK));
         expectLaunch(TASK);
       }
     }.run();
@@ -177,8 +177,8 @@ public class MesosSchedulerImplTest extends EasyMockTest {
     new OfferFixture() {
       @Override void respondToOffer() throws Exception {
         expectOfferAttributesSaved(OFFER);
-        expect(systemLauncher.createTask(OFFER)).andReturn(Optional.<TaskInfo>absent());
-        expect(userLauncher.createTask(OFFER)).andReturn(Optional.of(TASK));
+        expect(systemLauncher.willUse(OFFER)).andReturn(Optional.<TaskInfo>absent());
+        expect(userLauncher.willUse(OFFER)).andReturn(Optional.of(TASK));
         expectLaunch(TASK);
       }
     }.run();
@@ -189,8 +189,8 @@ public class MesosSchedulerImplTest extends EasyMockTest {
     new OfferFixture() {
       @Override void respondToOffer() throws Exception {
         expectOfferAttributesSaved(OFFER);
-        expect(systemLauncher.createTask(OFFER)).andReturn(Optional.of(BIGGER_TASK));
-        expect(userLauncher.createTask(OFFER)).andReturn(Optional.<TaskInfo>absent());
+        expect(systemLauncher.willUse(OFFER)).andReturn(Optional.of(BIGGER_TASK));
+        expect(userLauncher.willUse(OFFER)).andReturn(Optional.<TaskInfo>absent());
       }
     }.run();
   }
@@ -240,11 +240,11 @@ public class MesosSchedulerImplTest extends EasyMockTest {
       @Override void expectations() throws Exception {
         expectOfferAttributesSaved(OFFER);
         expectOfferAttributesSaved(OFFER_2);
-        expect(systemLauncher.createTask(OFFER)).andReturn(Optional.<TaskInfo>absent());
-        expect(userLauncher.createTask(OFFER)).andReturn(Optional.of(TASK));
+        expect(systemLauncher.willUse(OFFER)).andReturn(Optional.<TaskInfo>absent());
+        expect(userLauncher.willUse(OFFER)).andReturn(Optional.of(TASK));
         expectLaunch(TASK);
-        expect(systemLauncher.createTask(OFFER_2)).andReturn(Optional.<TaskInfo>absent());
-        expect(userLauncher.createTask(OFFER_2)).andReturn(Optional.<TaskInfo>absent());
+        expect(systemLauncher.willUse(OFFER_2)).andReturn(Optional.<TaskInfo>absent());
+        expect(userLauncher.willUse(OFFER_2)).andReturn(Optional.<TaskInfo>absent());
       }
 
       @Override void test() {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e68fa7b5/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
index ffc37db..5417aa9 100644
--- a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
@@ -83,7 +83,7 @@ public class UserTaskLauncherTest extends EasyMockTest {
 
     control.replay();
 
-    assertFalse(launcher.createTask(OFFER).isPresent());
+    assertFalse(launcher.willUse(OFFER).isPresent());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e68fa7b5/src/test/java/org/apache/aurora/scheduler/periodic/GcExecutorLauncherTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/periodic/GcExecutorLauncherTest.java
b/src/test/java/org/apache/aurora/scheduler/periodic/GcExecutorLauncherTest.java
index 98f5aa1..87df370 100644
--- a/src/test/java/org/apache/aurora/scheduler/periodic/GcExecutorLauncherTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/periodic/GcExecutorLauncherTest.java
@@ -114,19 +114,19 @@ public class GcExecutorLauncherTest extends EasyMockTest {
     replayAndCreate();
 
     // First call - no items in the cache, no tasks collected.
-    Optional<TaskInfo> taskInfo = gcExecutorLauncher.createTask(OFFER);
+    Optional<TaskInfo> taskInfo = gcExecutorLauncher.willUse(OFFER);
     assertTrue(taskInfo.isPresent());
     assertRetainedTasks(taskInfo.get(), thermosPrunedTask, thermosTask, nonThermosTask);
     ExecutorInfo executor1 = taskInfo.get().getExecutor();
 
     // Second call - host item alive, no tasks collected.
     clock.advance(Amount.of(15L, Time.MINUTES));
-    taskInfo = gcExecutorLauncher.createTask(OFFER);
+    taskInfo = gcExecutorLauncher.willUse(OFFER);
     assertFalse(taskInfo.isPresent());
 
     // Third call - two tasks collected.
     clock.advance(Amount.of(15L, Time.MINUTES));
-    taskInfo = gcExecutorLauncher.createTask(OFFER);
+    taskInfo = gcExecutorLauncher.willUse(OFFER);
     assertTrue(taskInfo.isPresent());
     assertRetainedTasks(taskInfo.get(), thermosPrunedTask);
 
@@ -147,7 +147,7 @@ public class GcExecutorLauncherTest extends EasyMockTest {
         .clearResources()
         .addAllResources(resources)
         .build();
-    assertFalse(gcExecutorLauncher.createTask(smallOffer).isPresent());
+    assertFalse(gcExecutorLauncher.willUse(smallOffer).isPresent());
   }
 
   private static void assertRetainedTasks(TaskInfo taskInfo, IScheduledTask... tasks)


Mime
View raw message