aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [6/8] aurora git commit: Break apart async package and AsyncModule into purpose-specific equivalents.
Date Wed, 22 Jul 2015 19:40:03 GMT
http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
new file mode 100644
index 0000000..14bf265
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
@@ -0,0 +1,408 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.offers;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Ordering;
+import com.google.common.eventbus.Subscribe;
+import com.twitter.common.inject.TimedInterceptor.Timed;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.Stats;
+
+import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
+import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.SlaveID;
+
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.gen.MaintenanceMode.DRAINED;
+import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
+import static org.apache.aurora.gen.MaintenanceMode.NONE;
+import static org.apache.aurora.gen.MaintenanceMode.SCHEDULED;
+import static org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged;
+
+/**
+ * Tracks the Offers currently known by the scheduler.
+ */
+public interface OfferManager extends EventSubscriber {
+
+  /**
+   * Notifies the scheduler of a new resource offer.
+   *
+   * @param offer Newly-available resource offer.
+   */
+  void addOffer(HostOffer offer);
+
+  /**
+   * Invalidates an offer.  This indicates that the scheduler should not attempt to match any
+   * tasks against the offer.
+   *
+   * @param offer Canceled offer.
+   */
+  void cancelOffer(OfferID offer);
+
+  /**
+   * Launches the first task that satisfies the {@code acceptor} by returning a {@link Assignment}.
+   *
+   * @param acceptor Function that determines if an offer is accepted.
+   * @param groupKey Task group key.
+   * @return {@code true} if the task was launched, {@code false} if no offers satisfied the
+   *         {@code acceptor}.
+   * @throws LaunchException If the acceptor accepted an offer, but there was an error launching the
+   *                         task.
+   */
+  boolean launchFirst(Function<HostOffer, Assignment> acceptor, TaskGroupKey groupKey)
+      throws LaunchException;
+
+  /**
+   * Notifies the offer queue that a host's attributes have changed.
+   *
+   * @param change State change notification.
+   */
+  void hostAttributesChanged(HostAttributesChanged change);
+
+  /**
+   * Gets the offers that the scheduler is holding.
+   *
+   * @return A snapshot of the offers that the scheduler is currently holding.
+   */
+  Iterable<HostOffer> getOffers();
+
+  /**
+   * Gets an offer for the given slave ID.
+   *
+   * @param slaveId Slave ID to get offer for.
+   * @return An offer for the slave ID.
+   */
+  Optional<HostOffer> getOffer(SlaveID slaveId);
+
+  /**
+   * Calculates the amount of time before an offer should be 'returned' by declining it.
+   * The delay is calculated for each offer that is received, so the return delay may be
+   * fixed or variable.
+   */
+  interface OfferReturnDelay extends Supplier<Amount<Long, Time>> {
+  }
+
+  /**
+   * Thrown when there was an unexpected failure trying to launch a task.
+   */
+  class LaunchException extends Exception {
+    LaunchException(String msg) {
+      super(msg);
+    }
+
+    LaunchException(String msg, Throwable cause) {
+      super(msg, cause);
+    }
+  }
+
+  class OfferManagerImpl implements OfferManager {
+    @VisibleForTesting
+    static final Logger LOG = Logger.getLogger(OfferManagerImpl.class.getName());
+
+    private final HostOffers hostOffers = new HostOffers();
+    private final AtomicLong offerRaces = Stats.exportLong("offer_accept_races");
+
+    private final Driver driver;
+    private final OfferReturnDelay returnDelay;
+    private final ScheduledExecutorService executor;
+
+    @Inject
+    @VisibleForTesting
+    public OfferManagerImpl(
+        Driver driver,
+        OfferReturnDelay returnDelay,
+        @AsyncExecutor ScheduledExecutorService executor) {
+
+      this.driver = requireNonNull(driver);
+      this.returnDelay = requireNonNull(returnDelay);
+      this.executor = requireNonNull(executor);
+    }
+
+    @Override
+    public void addOffer(final HostOffer offer) {
+      // We run a slight risk of a race here, which is acceptable.  The worst case is that we
+      // temporarily hold two offers for the same host, which should be corrected when we return
+      // them after the return delay.
+      // There's also a chance that we return an offer for compaction ~simultaneously with the
+      // same-host offer being canceled/returned.  This is also fine.
+      Optional<HostOffer> sameSlave = hostOffers.get(offer.getOffer().getSlaveId());
+      if (sameSlave.isPresent()) {
+        // If there are existing offers for the slave, decline all of them so the master can
+        // compact all of those offers into a single offer and send them back.
+        LOG.info("Returning offers for " + offer.getOffer().getSlaveId().getValue()
+            + " for compaction.");
+        decline(offer.getOffer().getId());
+        removeAndDecline(sameSlave.get().getOffer().getId());
+      } else {
+        hostOffers.add(offer);
+        executor.schedule(
+            new Runnable() {
+              @Override
+              public void run() {
+                removeAndDecline(offer.getOffer().getId());
+              }
+            },
+            returnDelay.get().as(Time.MILLISECONDS),
+            TimeUnit.MILLISECONDS);
+      }
+    }
+
+    void removeAndDecline(OfferID id) {
+      if (removeFromHostOffers(id)) {
+        decline(id);
+      }
+    }
+
+    void decline(OfferID id) {
+      LOG.fine("Declining offer " + id);
+      driver.declineOffer(id);
+    }
+
+    @Override
+    public void cancelOffer(final OfferID offerId) {
+      removeFromHostOffers(offerId);
+    }
+
+    private boolean removeFromHostOffers(final OfferID offerId) {
+      requireNonNull(offerId);
+
+      // The small risk of inconsistency is acceptable here - if we have an accept/remove race
+      // on an offer, the master will mark the task as LOST and it will be retried.
+      return hostOffers.remove(offerId);
+    }
+
+    @Override
+    public Iterable<HostOffer> getOffers() {
+      return hostOffers.getWeaklyConsistentOffers();
+    }
+
+    @Override
+    public Optional<HostOffer> getOffer(SlaveID slaveId) {
+      return hostOffers.get(slaveId);
+    }
+
+    /**
+     * Updates the preference of a host's offers.
+     *
+     * @param change Host change notification.
+     */
+    @Subscribe
+    public void hostAttributesChanged(HostAttributesChanged change) {
+      hostOffers.updateHostAttributes(change.getAttributes());
+    }
+
+    /**
+     * Notifies the queue that the driver is disconnected, and all the stored offers are now
+     * invalid.
+     * <p>
+     * The queue takes this as a signal to flush its queue.
+     *
+     * @param event Disconnected event.
+     */
+    @Subscribe
+    public void driverDisconnected(DriverDisconnected event) {
+      LOG.info("Clearing stale offers since the driver is disconnected.");
+      hostOffers.clear();
+    }
+
+    /**
+     * A container for the data structures used by this class, to make it easier to reason about
+     * the different indices used and their consistency.
+     */
+    private static class HostOffers {
+      private static final Comparator<HostOffer> PREFERENCE_COMPARATOR =
+          // Currently, the only preference is based on host maintenance status.
+          Ordering.explicit(NONE, SCHEDULED, DRAINING, DRAINED)
+              .onResultOf(new Function<HostOffer, MaintenanceMode>() {
+                @Override
+                public MaintenanceMode apply(HostOffer offer) {
+                  return offer.getAttributes().getMode();
+                }
+              })
+              .compound(Ordering.arbitrary());
+
+      private final Set<HostOffer> offers = new ConcurrentSkipListSet<>(PREFERENCE_COMPARATOR);
+      private final Map<OfferID, HostOffer> offersById = Maps.newHashMap();
+      private final Map<SlaveID, HostOffer> offersBySlave = Maps.newHashMap();
+      private final Map<String, HostOffer> offersByHost = Maps.newHashMap();
+      // TODO(maxim): Expose via a debug endpoint. AURORA-1136.
+      // Keep track of offer->groupKey mappings that will never be matched to avoid redundant
+      // scheduling attempts. See Assignment.Result for more details on static ban.
+      private final Multimap<OfferID, TaskGroupKey> staticallyBannedOffers = HashMultimap.create();
+
+      HostOffers() {
+        // Potential gotcha - since this is a ConcurrentSkipListSet, size() is more expensive.
+        // Could track this separately if it turns out to pose problems.
+        Stats.exportSize("outstanding_offers", offers);
+      }
+
+      synchronized Optional<HostOffer> get(SlaveID slaveId) {
+        return Optional.fromNullable(offersBySlave.get(slaveId));
+      }
+
+      synchronized void add(HostOffer offer) {
+        offers.add(offer);
+        offersById.put(offer.getOffer().getId(), offer);
+        offersBySlave.put(offer.getOffer().getSlaveId(), offer);
+        offersByHost.put(offer.getOffer().getHostname(), offer);
+      }
+
+      synchronized boolean remove(OfferID id) {
+        HostOffer removed = offersById.remove(id);
+        if (removed != null) {
+          offers.remove(removed);
+          offersBySlave.remove(removed.getOffer().getSlaveId());
+          offersByHost.remove(removed.getOffer().getHostname());
+          staticallyBannedOffers.removeAll(id);
+        }
+        return removed != null;
+      }
+
+      synchronized void updateHostAttributes(IHostAttributes attributes) {
+        HostOffer offer = offersByHost.remove(attributes.getHost());
+        if (offer != null) {
+          // Remove and re-add a host's offer to re-sort based on its new hostStatus
+          remove(offer.getOffer().getId());
+          add(new HostOffer(offer.getOffer(),  attributes));
+        }
+      }
+
+      synchronized Iterable<HostOffer> getWeaklyConsistentOffers() {
+        return Iterables.unmodifiableIterable(offers);
+      }
+
+      synchronized boolean isStaticallyBanned(HostOffer offer, TaskGroupKey groupKey) {
+        boolean result = staticallyBannedOffers.containsEntry(offer.getOffer().getId(), groupKey);
+        if (LOG.isLoggable(Level.FINE)) {
+          LOG.fine(String.format(
+              "Host offer %s is statically banned for %s: %s",
+              offer,
+              groupKey,
+              result));
+        }
+        return result;
+      }
+
+      synchronized void addStaticGroupBan(HostOffer offer, TaskGroupKey groupKey) {
+        OfferID offerId = offer.getOffer().getId();
+        if (offersById.containsKey(offerId)) {
+          staticallyBannedOffers.put(offerId, groupKey);
+
+          if (LOG.isLoggable(Level.FINE)) {
+            LOG.fine(
+                String.format("Adding static ban for offer: %s, groupKey: %s", offer, groupKey));
+          }
+        }
+      }
+
+      synchronized void clear() {
+        offers.clear();
+        offersById.clear();
+        offersBySlave.clear();
+        offersByHost.clear();
+        staticallyBannedOffers.clear();
+      }
+    }
+
+    @Timed("offer_queue_launch_first")
+    @Override
+    public boolean launchFirst(Function<HostOffer, Assignment> acceptor, TaskGroupKey groupKey)
+        throws LaunchException {
+
+      // It's important that this method is not called concurrently - doing so would open up the
+      // possibility of a race between the same offers being accepted by different threads.
+
+      for (HostOffer offer : hostOffers.getWeaklyConsistentOffers()) {
+        if (!hostOffers.isStaticallyBanned(offer, groupKey)
+            && acceptOffer(offer, acceptor, groupKey)) {
+          return true;
+        }
+      }
+
+      return false;
+    }
+
+    @Timed("offer_queue_accept_offer")
+    protected boolean acceptOffer(
+        HostOffer offer,
+        Function<HostOffer, Assignment> acceptor,
+        TaskGroupKey groupKey) throws LaunchException {
+
+      Assignment assignment = acceptor.apply(offer);
+      switch (assignment.getResult()) {
+
+        case SUCCESS:
+          // Guard against an offer being removed after we grabbed it from the iterator.
+          // If that happens, the offer will not exist in hostOffers, and we can immediately
+          // send it back to LOST for quick reschedule.
+          // Removing while iterating counts on the use of a weakly-consistent iterator being used,
+          // which is a feature of ConcurrentSkipListSet.
+          if (hostOffers.remove(offer.getOffer().getId())) {
+            try {
+              driver.launchTask(offer.getOffer().getId(), assignment.getTaskInfo().get());
+              return true;
+            } catch (IllegalStateException e) {
+              // TODO(William Farner): Catch only the checked exception produced by Driver
+              // once it changes from throwing IllegalStateException when the driver is not yet
+              // registered.
+              throw new LaunchException("Failed to launch task.", e);
+            }
+          } else {
+            offerRaces.incrementAndGet();
+            throw new LaunchException(
+                "Accepted offer no longer exists in offer queue, likely data race.");
+          }
+
+        case FAILURE_STATIC_MISMATCH:
+          // Exclude an offer that results in a static mismatch from further attempts to match
+          // against all tasks from the same group.
+          hostOffers.addStaticGroupBan(offer, groupKey);
+          return false;
+
+        default:
+          return false;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java b/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java
new file mode 100644
index 0000000..6ab413a
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.offers;
+
+import javax.inject.Singleton;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.PrivateModule;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.args.constraints.NotNegative;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.Random;
+
+import org.apache.aurora.scheduler.events.PubsubEventModule;
+
+/**
+ * Binding module for resource offer management.
+ */
+public class OffersModule extends AbstractModule {
+
+  @CmdLine(name = "min_offer_hold_time",
+      help = "Minimum amount of time to hold a resource offer before declining.")
+  @NotNegative
+  private static final Arg<Amount<Integer, Time>> MIN_OFFER_HOLD_TIME =
+      Arg.create(Amount.of(5, Time.MINUTES));
+
+  @CmdLine(name = "offer_hold_jitter_window",
+      help = "Maximum amount of random jitter to add to the offer hold time window.")
+  @NotNegative
+  private static final Arg<Amount<Integer, Time>> OFFER_HOLD_JITTER_WINDOW =
+      Arg.create(Amount.of(1, Time.MINUTES));
+
+  @Override
+  protected void configure() {
+    install(new PrivateModule() {
+      @Override
+      protected void configure() {
+        bind(OfferManager.OfferReturnDelay.class).toInstance(
+            new RandomJitterReturnDelay(
+                MIN_OFFER_HOLD_TIME.get().as(Time.MILLISECONDS),
+                OFFER_HOLD_JITTER_WINDOW.get().as(Time.MILLISECONDS),
+                new Random.SystemRandom(new java.util.Random())));
+        bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class);
+        bind(OfferManager.OfferManagerImpl.class).in(Singleton.class);
+        expose(OfferManager.class);
+      }
+    });
+    PubsubEventModule.bindSubscriber(binder(), OfferManager.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/offers/RandomJitterReturnDelay.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/RandomJitterReturnDelay.java b/src/main/java/org/apache/aurora/scheduler/offers/RandomJitterReturnDelay.java
new file mode 100644
index 0000000..a3e63bf
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/offers/RandomJitterReturnDelay.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.offers;
+
+import java.util.Objects;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.Random;
+
+import org.apache.aurora.scheduler.offers.OfferManager.OfferReturnDelay;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Returns offers after a random duration within a fixed window.
+ */
+@VisibleForTesting
+class RandomJitterReturnDelay implements OfferReturnDelay {
+  private final int minHoldTimeMs;
+  private final int maxJitterWindowMs;
+  private final Random random;
+
+  RandomJitterReturnDelay(int minHoldTimeMs, int maxJitterWindowMs, Random random) {
+    checkArgument(minHoldTimeMs >= 0);
+    checkArgument(maxJitterWindowMs >= 0);
+
+    this.minHoldTimeMs = minHoldTimeMs;
+    this.maxJitterWindowMs = maxJitterWindowMs;
+    this.random = Objects.requireNonNull(random);
+  }
+
+  @Override
+  public Amount<Long, Time> get() {
+    return Amount.of((long) minHoldTimeMs + random.nextInt(maxJitterWindowMs), Time.MILLISECONDS);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/preemptor/BiCache.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/BiCache.java b/src/main/java/org/apache/aurora/scheduler/preemptor/BiCache.java
new file mode 100644
index 0000000..2551057
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/BiCache.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.preemptor;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+import com.google.common.base.Ticker;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.util.Clock;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A bi-directional cache of items. Entries are purged from cache after
+ * {@link BiCacheSettings#expireAfter}.
+ *
+ * @param <K> Key type.
+ * @param <V> Value type.
+ */
+public class BiCache<K, V> {
+
+  public static class BiCacheSettings {
+    private final Amount<Long, Time> expireAfter;
+    private final String cacheSizeStatName;
+
+    public BiCacheSettings(Amount<Long, Time> expireAfter, String cacheSizeStatName) {
+      this.expireAfter = requireNonNull(expireAfter);
+      this.cacheSizeStatName = requireNonNull(cacheSizeStatName);
+    }
+  }
+
+  private final Cache<K, V> cache;
+  private final Multimap<V, K> inverse = HashMultimap.create();
+
+  @Inject
+  public BiCache(
+      StatsProvider statsProvider,
+      BiCacheSettings settings,
+      final Clock clock) {
+
+    requireNonNull(clock);
+    this.cache = CacheBuilder.newBuilder()
+        .expireAfterWrite(settings.expireAfter.as(Time.MINUTES), TimeUnit.MINUTES)
+        .ticker(new Ticker() {
+          @Override
+          public long read() {
+            return clock.nowNanos();
+          }
+        })
+        .removalListener(new RemovalListener<K, V>() {
+          @Override
+          public void onRemoval(RemovalNotification<K, V> notification) {
+            inverse.remove(notification.getValue(), notification.getKey());
+          }
+        })
+        .build();
+
+    statsProvider.makeGauge(
+        settings.cacheSizeStatName,
+        new Supplier<Long>() {
+          @Override
+          public Long get() {
+            return cache.size();
+          }
+        });
+  }
+
+  /**
+   * Puts a new key/value pair.
+   *
+   * @param key Key to add.
+   * @param value Value to add.
+   */
+  public synchronized void put(K key, V value) {
+    requireNonNull(key);
+    requireNonNull(value);
+    cache.put(key, value);
+    inverse.put(value, key);
+  }
+
+  /**
+   * Gets a cached value by key.
+   *
+   * @param key Key to get value for.
+   * @return Optional of value.
+   */
+  public synchronized Optional<V> get(K key) {
+    return Optional.fromNullable(cache.getIfPresent(key));
+  }
+
+  /**
+   * Gets a set of keys for a given value.
+   *
+   * @param value Value to get all keys for.
+   * @return An {@link Iterable} of keys or empty if value does not exist.
+   */
+  public synchronized Set<K> getByValue(V value) {
+    // Cache items are lazily removed by routine maintenance operations during get/write access.
+    // Forcing cleanup here to ensure proper data integrity.
+    cache.cleanUp();
+    return ImmutableSet.copyOf(inverse.get(value));
+  }
+
+  /**
+   * Removes a key/value pair from cache.
+   *
+   * @param key Key to remove.
+   * @param value Value to remove.
+   */
+  public synchronized void remove(K key, V value) {
+    inverse.remove(value, key);
+    cache.invalidate(key);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/preemptor/ClusterState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/ClusterState.java b/src/main/java/org/apache/aurora/scheduler/preemptor/ClusterState.java
new file mode 100644
index 0000000..ce3bc7e
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/ClusterState.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.preemptor;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Multimap;
+
+/**
+ * A facade for the preemptor to gain access to the state of scheduled tasks in the cluster.
+ */
+@VisibleForTesting
+public interface ClusterState {
+
+  /**
+   * Gets a snapshot of the active tasks in the cluster, indexed by the slave IDs they are
+   * assigned to.
+   * <p>
+   * TODO(wfarner): Return a more minimal type than IAssignedTask here.
+   *
+   * @return Active tasks and their associated slave IDs.
+   */
+  Multimap<String, PreemptionVictim> getSlavesToActiveTasks();
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/preemptor/ClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/ClusterStateImpl.java b/src/main/java/org/apache/aurora/scheduler/preemptor/ClusterStateImpl.java
new file mode 100644
index 0000000..42e2ca4
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/ClusterStateImpl.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.preemptor;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.eventbus.Subscribe;
+
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+
+/**
+ * A cached view of cluster state, kept up to date by pubsub notifications.
+ */
+public class ClusterStateImpl implements ClusterState, PubsubEvent.EventSubscriber {
+
+  private final Multimap<String, PreemptionVictim> victims =
+      Multimaps.synchronizedMultimap(HashMultimap.create());
+
+  @Override
+  public Multimap<String, PreemptionVictim> getSlavesToActiveTasks() {
+    return Multimaps.unmodifiableMultimap(victims);
+  }
+
+  @Subscribe
+  public void taskChangedState(TaskStateChange stateChange) {
+    synchronized (victims) {
+      String slaveId = stateChange.getTask().getAssignedTask().getSlaveId();
+      PreemptionVictim victim = PreemptionVictim.fromTask(stateChange.getTask().getAssignedTask());
+      if (Tasks.SLAVE_ASSIGNED_STATES.contains(stateChange.getNewState())) {
+        victims.put(slaveId, victim);
+      } else {
+        victims.remove(slaveId, victim);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java
new file mode 100644
index 0000000..f1b075a
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.preemptor;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.inject.Inject;
+import javax.inject.Qualifier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+import com.google.common.collect.Sets;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED;
+
+/**
+ * Attempts to find preemption slots for all PENDING tasks eligible for preemption.
+ */
+@VisibleForTesting
+public class PendingTaskProcessor implements Runnable {
+  private final Storage storage;
+  private final OfferManager offerManager;
+  private final PreemptionVictimFilter preemptionVictimFilter;
+  private final PreemptorMetrics metrics;
+  private final Amount<Long, Time> preemptionCandidacyDelay;
+  private final BiCache<PreemptionProposal, TaskGroupKey> slotCache;
+  private final ClusterState clusterState;
+  private final Clock clock;
+
+  /**
+   * Binding annotation for the time interval after which a pending task becomes eligible to
+   * preempt other tasks. To avoid excessive churn, the preemptor requires that a task is PENDING
+   * for a duration (dictated by {@link #preemptionCandidacyDelay}) before it becomes eligible
+   * to preempt other tasks.
+   */
+  @VisibleForTesting
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  public @interface PreemptionDelay { }
+
+  @Inject
+  PendingTaskProcessor(
+      Storage storage,
+      OfferManager offerManager,
+      PreemptionVictimFilter preemptionVictimFilter,
+      PreemptorMetrics metrics,
+      @PreemptionDelay Amount<Long, Time> preemptionCandidacyDelay,
+      BiCache<PreemptionProposal, TaskGroupKey> slotCache,
+      ClusterState clusterState,
+      Clock clock) {
+
+    this.storage = requireNonNull(storage);
+    this.offerManager = requireNonNull(offerManager);
+    this.preemptionVictimFilter = requireNonNull(preemptionVictimFilter);
+    this.metrics = requireNonNull(metrics);
+    this.preemptionCandidacyDelay = requireNonNull(preemptionCandidacyDelay);
+    this.slotCache = requireNonNull(slotCache);
+    this.clusterState = requireNonNull(clusterState);
+    this.clock = requireNonNull(clock);
+  }
+
+  @Override
+  public void run() {
+    metrics.recordTaskProcessorRun();
+    storage.read(new Storage.Work.Quiet<Void>() {
+      @Override
+      public Void apply(StoreProvider store) {
+        Multimap<String, PreemptionVictim> slavesToActiveTasks =
+            clusterState.getSlavesToActiveTasks();
+
+        if (slavesToActiveTasks.isEmpty()) {
+          // No preemption victims to consider.
+          return null;
+        }
+
+        // Group the offers by slave id so they can be paired with active tasks from the same slave.
+        Map<String, HostOffer> slavesToOffers =
+            Maps.uniqueIndex(offerManager.getOffers(), OFFER_TO_SLAVE_ID);
+
+        Set<String> allSlaves = Sets.newHashSet(Iterables.concat(
+            slavesToOffers.keySet(),
+            slavesToActiveTasks.keySet()));
+
+        // The algorithm below attempts to find a reservation for every task group by matching
+        // it against all available slaves until a preemption slot is found. Groups are evaluated
+        // in a round-robin fashion to ensure fairness (e.g.: G1, G2, G3, G1, G2).
+        // A slave is removed from further matching once a reservation is made. Similarly, all
+        // identical task group instances are removed from further iteration if none of the
+        // available slaves could yield a preemption proposal. A consuming iterator is used for
+        // task groups to ensure iteration order is preserved after a task group is removed.
+        LoadingCache<IJobKey, AttributeAggregate> jobStates = attributeCache(store);
+        List<TaskGroupKey> pendingGroups = fetchIdlePendingGroups(store);
+        Iterator<TaskGroupKey> groups = Iterators.consumingIterator(pendingGroups.iterator());
+        while (!pendingGroups.isEmpty()) {
+          boolean matched = false;
+          TaskGroupKey group = groups.next();
+          ITaskConfig task = group.getTask();
+
+          metrics.recordPreemptionAttemptFor(task);
+          Iterator<String> slaveIterator = allSlaves.iterator();
+          while (slaveIterator.hasNext()) {
+            String slaveId = slaveIterator.next();
+            Optional<ImmutableSet<PreemptionVictim>> candidates =
+                preemptionVictimFilter.filterPreemptionVictims(
+                    task,
+                    slavesToActiveTasks.get(slaveId),
+                    jobStates.getUnchecked(task.getJob()),
+                    Optional.fromNullable(slavesToOffers.get(slaveId)),
+                    store);
+
+            metrics.recordSlotSearchResult(candidates, task);
+            if (candidates.isPresent()) {
+              // Slot found -> remove slave to avoid multiple task reservations.
+              slaveIterator.remove();
+              slotCache.put(new PreemptionProposal(candidates.get(), slaveId), group);
+              matched = true;
+              break;
+            }
+          }
+          if (!matched) {
+            // No slot found for the group -> remove group and reset group iterator.
+            pendingGroups.removeAll(ImmutableSet.of(group));
+            groups = Iterators.consumingIterator(pendingGroups.iterator());
+          }
+        }
+        return null;
+      }
+    });
+  }
+
+  private List<TaskGroupKey> fetchIdlePendingGroups(StoreProvider store) {
+    Multiset<TaskGroupKey> taskGroupCounts = HashMultiset.create(
+        FluentIterable.from(store.getTaskStore().fetchTasks(Query.statusScoped(PENDING)))
+            .filter(Predicates.and(isIdleTask, Predicates.not(hasCachedSlot)))
+            .transform(Functions.compose(ASSIGNED_TO_GROUP_KEY, SCHEDULED_TO_ASSIGNED)));
+
+    return getPreemptionSequence(taskGroupCounts);
+  }
+
+  /**
+   * Creates execution sequence for pending task groups by interleaving their unique occurrences.
+   * For example: {G1, G1, G1, G2, G2} will be converted into {G1, G2, G1, G2, G1}.
+   *
+   * @param groups Multiset of task groups.
+   * @return A task group execution sequence.
+   */
+  private static List<TaskGroupKey> getPreemptionSequence(Multiset<TaskGroupKey> groups) {
+    Multiset<TaskGroupKey> mutableGroups = HashMultiset.create(groups);
+    List<TaskGroupKey> instructions = Lists.newLinkedList();
+    Set<TaskGroupKey> keys = ImmutableSet.copyOf(groups.elementSet());
+    while (!mutableGroups.isEmpty()) {
+      for (TaskGroupKey key : keys) {
+        if (mutableGroups.contains(key)) {
+          instructions.add(key);
+          mutableGroups.remove(key);
+        }
+      }
+    }
+
+    return instructions;
+  }
+
+  private LoadingCache<IJobKey, AttributeAggregate> attributeCache(final StoreProvider store) {
+    return CacheBuilder.newBuilder().build(CacheLoader.from(
+        new Function<IJobKey, AttributeAggregate>() {
+          @Override
+          public AttributeAggregate apply(IJobKey job) {
+            return AttributeAggregate.getJobActiveState(store, job);
+          }
+        }));
+  }
+
+  private static final Function<IAssignedTask, TaskGroupKey> ASSIGNED_TO_GROUP_KEY =
+      new Function<IAssignedTask, TaskGroupKey>() {
+        @Override
+        public TaskGroupKey apply(IAssignedTask task) {
+          return TaskGroupKey.from(task.getTask());
+        }
+      };
+
+  private final Predicate<IScheduledTask> hasCachedSlot = new Predicate<IScheduledTask>() {
+    @Override
+    public boolean apply(IScheduledTask task) {
+      return !slotCache.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask())).isEmpty();
+    }
+  };
+
+  private final Predicate<IScheduledTask> isIdleTask = new Predicate<IScheduledTask>() {
+    @Override
+    public boolean apply(IScheduledTask task) {
+      return (clock.nowMillis() - Tasks.getLatestEvent(task).getTimestamp())
+          >= preemptionCandidacyDelay.as(Time.MILLISECONDS);
+    }
+  };
+
+  private static final Function<HostOffer, String> OFFER_TO_SLAVE_ID =
+      new Function<HostOffer, String>() {
+        @Override
+        public String apply(HostOffer offer) {
+          return offer.getOffer().getSlaveId().getValue();
+        }
+      };
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionProposal.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionProposal.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionProposal.java
new file mode 100644
index 0000000..d598b02
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionProposal.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.preemptor;
+
+import java.util.Objects;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A set of tasks proposed for preemption on a given slave.
+ */
+class PreemptionProposal {
+  private final Set<PreemptionVictim> victims;
+  private final String slaveId;
+
+  PreemptionProposal(ImmutableSet<PreemptionVictim> victims, String slaveId) {
+    this.victims = requireNonNull(victims);
+    this.slaveId = requireNonNull(slaveId);
+  }
+
+  Set<PreemptionVictim> getVictims() {
+    return victims;
+  }
+
+  String getSlaveId() {
+    return slaveId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof PreemptionProposal)) {
+      return false;
+    }
+
+    PreemptionProposal other = (PreemptionProposal) o;
+    return Objects.equals(getVictims(), other.getVictims())
+        && Objects.equals(getSlaveId(), other.getSlaveId());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(victims, slaveId);
+  }
+
+  @Override
+  public String toString() {
+    return com.google.common.base.Objects.toStringHelper(this)
+        .add("victims", getVictims())
+        .add("slaveId", getSlaveId())
+        .toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
new file mode 100644
index 0000000..a93fa22
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.preemptor;
+
+import java.util.Objects;
+
+import org.apache.aurora.scheduler.configuration.Resources;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+/**
+ * A victim to be considered as a candidate for preemption.
+ */
+public final class PreemptionVictim {
+  private final String slaveHost;
+  private final boolean production;
+  private final String role;
+  private final int priority;
+  private final Resources resources;
+  private final String taskId;
+
+  private PreemptionVictim(
+      String slaveHost,
+      boolean production,
+      String role,
+      int priority,
+      Resources resources,
+      String taskId) {
+
+    this.slaveHost = slaveHost;
+    this.production = production;
+    this.role = role;
+    this.priority = priority;
+    this.resources = resources;
+    this.taskId = taskId;
+  }
+
+  public static PreemptionVictim fromTask(IAssignedTask task) {
+    ITaskConfig config = task.getTask();
+    return new PreemptionVictim(
+        task.getSlaveHost(),
+        config.isProduction(),
+        config.getJob().getRole(),
+        config.getPriority(),
+        Resources.from(task.getTask()),
+        task.getTaskId());
+  }
+
+  public String getSlaveHost() {
+    return slaveHost;
+  }
+
+  public boolean isProduction() {
+    return production;
+  }
+
+  public String getRole() {
+    return role;
+  }
+
+  public int getPriority() {
+    return priority;
+  }
+
+  public Resources getResources() {
+    return resources;
+  }
+
+  public String getTaskId() {
+    return taskId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof PreemptionVictim)) {
+      return false;
+    }
+
+    PreemptionVictim other = (PreemptionVictim) o;
+    return Objects.equals(getSlaveHost(), other.getSlaveHost())
+        && Objects.equals(isProduction(), other.isProduction())
+        && Objects.equals(getRole(), other.getRole())
+        && Objects.equals(getPriority(), other.getPriority())
+        && Objects.equals(getResources(), other.getResources())
+        && Objects.equals(getTaskId(), other.getTaskId());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(slaveHost, production, role, priority, resources, taskId);
+  }
+
+  @Override
+  public String toString() {
+    return com.google.common.base.Objects.toStringHelper(this)
+        .add("slaveHost", getSlaveHost())
+        .add("production", isProduction())
+        .add("role", getRole())
+        .add("priority", getPriority())
+        .add("resources", getResources())
+        .add("taskId", getTaskId())
+        .toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
new file mode 100644
index 0000000..4293415
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.preemptor;
+
+import java.util.Set;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.ResourceSlot;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
+import org.apache.aurora.scheduler.mesos.ExecutorSettings;
+import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Filters active tasks (victims) and available offer (slack) resources that can accommodate a
+ * given task (candidate), provided victims are preempted.
+ * <p>
+ * A task may preempt another task if the following conditions hold true:
+ * <ol>
+ *  <li>The resources reserved for a victim (or a set of victims) are sufficient to satisfy
+ *    the candidate.
+ *  </li>
+ *  <li>Both candidate and victim are owned by the same user and the
+ *    {@link ITaskConfig#getPriority} of a victim is lower OR a victim is non-production and the
+ *    candidate is production.
+ *  </li>
+ * </ol>
+ */
+public interface PreemptionVictimFilter {
+  /**
+   * Returns a set of {@link PreemptionVictim} that can accommodate a given task if preempted.
+   *
+   * @param pendingTask Task to search preemption slot for.
+   * @param victims Active tasks on a slave.
+   * @param attributeAggregate An {@link AttributeAggregate} instance for the task's job.
+   * @param offer A resource offer for a slave.
+   * @param storeProvider A store provider to access task data.
+   * @return A set of {@code PreemptionVictim} instances to preempt for a given task.
+   */
+  Optional<ImmutableSet<PreemptionVictim>> filterPreemptionVictims(
+      ITaskConfig pendingTask,
+      Iterable<PreemptionVictim> victims,
+      AttributeAggregate attributeAggregate,
+      Optional<HostOffer> offer,
+      StoreProvider storeProvider);
+
+  class PreemptionVictimFilterImpl implements PreemptionVictimFilter {
+    private final SchedulingFilter schedulingFilter;
+    private final ExecutorSettings executorSettings;
+    private final PreemptorMetrics metrics;
+
+    @Inject
+    PreemptionVictimFilterImpl(
+        SchedulingFilter schedulingFilter,
+        ExecutorSettings executorSettings,
+        PreemptorMetrics metrics) {
+
+      this.schedulingFilter = requireNonNull(schedulingFilter);
+      this.executorSettings = requireNonNull(executorSettings);
+      this.metrics = requireNonNull(metrics);
+    }
+
+    private static final Function<HostOffer, ResourceSlot> OFFER_TO_RESOURCE_SLOT =
+        new Function<HostOffer, ResourceSlot>() {
+          @Override
+          public ResourceSlot apply(HostOffer offer) {
+            return ResourceSlot.from(offer.getOffer());
+          }
+        };
+
+    private static final Function<HostOffer, String> OFFER_TO_HOST =
+        new Function<HostOffer, String>() {
+          @Override
+          public String apply(HostOffer offer) {
+            return offer.getOffer().getHostname();
+          }
+        };
+
+    private static final Function<PreemptionVictim, String> VICTIM_TO_HOST =
+        new Function<PreemptionVictim, String>() {
+          @Override
+          public String apply(PreemptionVictim victim) {
+            return victim.getSlaveHost();
+          }
+        };
+
+    private final Function<PreemptionVictim, ResourceSlot> victimToResources =
+        new Function<PreemptionVictim, ResourceSlot>() {
+          @Override
+          public ResourceSlot apply(PreemptionVictim victim) {
+            return ResourceSlot.from(victim, executorSettings);
+          }
+        };
+
+    // TODO(zmanji) Consider using Dominant Resource Fairness for ordering instead of the vector
+    // ordering
+    private final Ordering<PreemptionVictim> resourceOrder =
+        ResourceSlot.ORDER.onResultOf(victimToResources).reverse();
+
+    @Override
+    public Optional<ImmutableSet<PreemptionVictim>> filterPreemptionVictims(
+        ITaskConfig pendingTask,
+        Iterable<PreemptionVictim> possibleVictims,
+        AttributeAggregate jobState,
+        Optional<HostOffer> offer,
+        StoreProvider storeProvider) {
+
+      // This enforces the precondition that all of the resources are from the same host. We need to
+      // get the host for the schedulingFilter.
+      Set<String> hosts = ImmutableSet.<String>builder()
+          .addAll(Iterables.transform(possibleVictims, VICTIM_TO_HOST))
+          .addAll(Iterables.transform(offer.asSet(), OFFER_TO_HOST)).build();
+
+      ResourceSlot slackResources =
+          ResourceSlot.sum(Iterables.transform(offer.asSet(), OFFER_TO_RESOURCE_SLOT));
+
+      FluentIterable<PreemptionVictim> preemptableTasks = FluentIterable.from(possibleVictims)
+          .filter(preemptionFilter(pendingTask));
+
+      if (preemptableTasks.isEmpty()) {
+        return Optional.absent();
+      }
+
+      Set<PreemptionVictim> toPreemptTasks = Sets.newHashSet();
+
+      Iterable<PreemptionVictim> sortedVictims =
+          resourceOrder.immutableSortedCopy(preemptableTasks);
+
+      Optional<IHostAttributes> attributes =
+          storeProvider.getAttributeStore().getHostAttributes(Iterables.getOnlyElement(hosts));
+
+      if (!attributes.isPresent()) {
+        metrics.recordMissingAttributes();
+        return Optional.absent();
+      }
+
+      for (PreemptionVictim victim : sortedVictims) {
+        toPreemptTasks.add(victim);
+
+        ResourceSlot totalResource = ResourceSlot.sum(
+            ResourceSlot.sum(Iterables.transform(toPreemptTasks, victimToResources)),
+            slackResources);
+
+        Set<Veto> vetoes = schedulingFilter.filter(
+            new UnusedResource(totalResource, attributes.get()),
+            new ResourceRequest(pendingTask, jobState));
+
+        if (vetoes.isEmpty()) {
+          return Optional.of(ImmutableSet.copyOf(toPreemptTasks));
+        }
+      }
+      return Optional.absent();
+    }
+
+    /**
+     * Creates a filter that will find tasks that the provided {@code pendingTask} may preempt.
+     *
+     * @param pendingTask A task that is not scheduled to possibly preempt other tasks for.
+     * @return A filter that will compare the priorities and resources required by other tasks
+     *     with {@code preemptableTask}.
+     */
+    private static Predicate<PreemptionVictim> preemptionFilter(final ITaskConfig pendingTask) {
+      return new Predicate<PreemptionVictim>() {
+        @Override
+        public boolean apply(PreemptionVictim possibleVictim) {
+          boolean pendingIsProduction = pendingTask.isProduction();
+          boolean victimIsProduction = possibleVictim.isProduction();
+
+          if (pendingIsProduction && !victimIsProduction) {
+            return true;
+          } else if (pendingIsProduction == victimIsProduction) {
+            // If production flags are equal, preemption is based on priority within the same role.
+            if (pendingTask.getJob().getRole().equals(possibleVictim.getRole())) {
+              return pendingTask.getPriority() > possibleVictim.getPriority();
+            } else {
+              return false;
+            }
+          } else {
+            return false;
+          }
+        }
+      };
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java
new file mode 100644
index 0000000..7d2903a
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.preemptor;
+
+import java.util.Set;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.mesos.Protos.SlaveID;
+
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
+
+/**
+ * Attempts to preempt active tasks in favor of the provided PENDING task in case a preemption
+ * slot has been previously found.
+ */
+public interface Preemptor {
+  /**
+   * Preempts victim tasks in case a valid preemption slot exists.
+   *
+   * @param task Preempting task.
+   * @param jobState Current job state aggregate.
+   * @param storeProvider Store provider to use for task preemption.
+   * @return ID of the slave where preemption occurred.
+   */
+  Optional<String> attemptPreemptionFor(
+      IAssignedTask task,
+      AttributeAggregate jobState,
+      MutableStoreProvider storeProvider);
+
+  class PreemptorImpl implements Preemptor {
+    private final StateManager stateManager;
+    private final OfferManager offerManager;
+    private final PreemptionVictimFilter preemptionVictimFilter;
+    private final PreemptorMetrics metrics;
+    private final BiCache<PreemptionProposal, TaskGroupKey> slotCache;
+
+    @Inject
+    PreemptorImpl(
+        StateManager stateManager,
+        OfferManager offerManager,
+        PreemptionVictimFilter preemptionVictimFilter,
+        PreemptorMetrics metrics,
+        BiCache<PreemptionProposal, TaskGroupKey> slotCache) {
+
+      this.stateManager = requireNonNull(stateManager);
+      this.offerManager = requireNonNull(offerManager);
+      this.preemptionVictimFilter = requireNonNull(preemptionVictimFilter);
+      this.metrics = requireNonNull(metrics);
+      this.slotCache = requireNonNull(slotCache);
+    }
+
+    @Override
+    public Optional<String> attemptPreemptionFor(
+        IAssignedTask pendingTask,
+        AttributeAggregate jobState,
+        MutableStoreProvider store) {
+
+      TaskGroupKey groupKey = TaskGroupKey.from(pendingTask.getTask());
+      Set<PreemptionProposal> preemptionProposals = slotCache.getByValue(groupKey);
+
+      // A preemption slot is available -> attempt to preempt tasks.
+      if (!preemptionProposals.isEmpty()) {
+        // Get the next available preemption slot.
+        PreemptionProposal slot = preemptionProposals.iterator().next();
+        slotCache.remove(slot, groupKey);
+
+        // Validate PreemptionProposal is still valid for the given task.
+        SlaveID slaveId = SlaveID.newBuilder().setValue(slot.getSlaveId()).build();
+        Optional<ImmutableSet<PreemptionVictim>> validatedVictims =
+            preemptionVictimFilter.filterPreemptionVictims(
+                pendingTask.getTask(),
+                slot.getVictims(),
+                jobState,
+                offerManager.getOffer(slaveId),
+                store);
+
+        metrics.recordSlotValidationResult(validatedVictims);
+        if (!validatedVictims.isPresent()) {
+          // Previously found victims are no longer valid -> let the next run find a new slot.
+          return Optional.absent();
+        }
+
+        for (PreemptionVictim toPreempt : validatedVictims.get()) {
+          metrics.recordTaskPreemption(toPreempt);
+          stateManager.changeState(
+              store,
+              toPreempt.getTaskId(),
+              Optional.absent(),
+              PREEMPTING,
+              Optional.of("Preempting in favor of " + pendingTask.getTaskId()));
+        }
+        return Optional.of(slot.getSlaveId());
+      }
+
+      return Optional.absent();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorMetrics.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorMetrics.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorMetrics.java
new file mode 100644
index 0000000..30bb814
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorMetrics.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.preemptor;
+
+import java.util.Set;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.scheduler.stats.CachedCounters;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Defines methods to manage preemptor metrics.
+ */
+@VisibleForTesting
+public class PreemptorMetrics {
+  @VisibleForTesting
+  static final String MISSING_ATTRIBUTES_NAME = "preemptor_missing_attributes";
+
+  @VisibleForTesting
+  static final String TASK_PROCESSOR_RUN_NAME = "preemptor_task_processor_runs";
+
+  private volatile boolean exported = false;
+  private final CachedCounters counters;
+
+  @Inject
+  PreemptorMetrics(CachedCounters counters) {
+    this.counters = requireNonNull(counters);
+    assertFullyExported();
+  }
+
+  private static String prod(boolean production) {
+    return production ? "prod" : "non_prod";
+  }
+
+  private static String result(boolean success) {
+    return success ? "successful" : "failed";
+  }
+
+  private void assertFullyExported() {
+    if (exported) {
+      return;
+    }
+
+    // Dummy-read all stats to ensure they are exported.
+    Set<String> allStats = ImmutableSet.of(
+        attemptsStatName(false),
+        attemptsStatName(true),
+        successStatName(false),
+        successStatName(true),
+        slotSearchStatName(true, false),
+        slotSearchStatName(false, false),
+        slotSearchStatName(true, true),
+        slotSearchStatName(false, true),
+        slotValidationStatName(true),
+        slotValidationStatName(false),
+        MISSING_ATTRIBUTES_NAME,
+        TASK_PROCESSOR_RUN_NAME);
+    for (String stat : allStats) {
+      counters.get(stat);
+    }
+
+    exported = true;
+  }
+
+  private void increment(String stat) {
+    assertFullyExported();
+    counters.get(stat).incrementAndGet();
+  }
+
+  @VisibleForTesting
+  static String attemptsStatName(boolean production) {
+    return "preemptor_slot_search_attempts_for_" + prod(production);
+  }
+
+  @VisibleForTesting
+  static String successStatName(boolean production) {
+    return "preemptor_tasks_preempted_" + prod(production);
+  }
+
+  @VisibleForTesting
+  static String slotSearchStatName(boolean success, boolean production) {
+    return String.format("preemptor_slot_search_%s_for_%s", result(success), prod(production));
+  }
+
+  @VisibleForTesting
+  static String slotValidationStatName(boolean success) {
+    return "preemptor_slot_validation_" + result(success);
+  }
+
+  void recordPreemptionAttemptFor(ITaskConfig task) {
+    increment(attemptsStatName(task.isProduction()));
+  }
+
+  void recordTaskPreemption(PreemptionVictim victim) {
+    increment(successStatName(victim.isProduction()));
+  }
+
+  void recordSlotSearchResult(Optional<?> result, ITaskConfig task) {
+    increment(slotSearchStatName(result.isPresent(), task.isProduction()));
+  }
+
+  void recordSlotValidationResult(Optional<?> result) {
+    increment(slotValidationStatName(result.isPresent()));
+  }
+
+  void recordMissingAttributes() {
+    increment(MISSING_ATTRIBUTES_NAME);
+  }
+
+  void recordTaskProcessorRun() {
+    increment(TASK_PROCESSOR_RUN_NAME);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java
new file mode 100644
index 0000000..fc39a6d
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.preemptor;
+
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.AbstractScheduledService;
+import com.google.inject.AbstractModule;
+import com.google.inject.PrivateModule;
+import com.google.inject.TypeLiteral;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+
+import org.apache.aurora.scheduler.SchedulerServicesModule;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.preemptor.BiCache.BiCacheSettings;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+
+import static java.util.Objects.requireNonNull;
+
+public class PreemptorModule extends AbstractModule {
+
+  private static final Logger LOG = Logger.getLogger(PreemptorModule.class.getName());
+
+  @CmdLine(name = "enable_preemptor",
+      help = "Enable the preemptor and preemption")
+  private static final Arg<Boolean> ENABLE_PREEMPTOR = Arg.create(true);
+
+  @CmdLine(name = "preemption_delay",
+      help = "Time interval after which a pending task becomes eligible to preempt other tasks")
+  private static final Arg<Amount<Long, Time>> PREEMPTION_DELAY =
+      Arg.create(Amount.of(3L, Time.MINUTES));
+
+  @CmdLine(name = "preemption_slot_hold_time",
+      help = "Time to hold a preemption slot found before it is discarded.")
+  private static final Arg<Amount<Long, Time>> PREEMPTION_SLOT_HOLD_TIME =
+      Arg.create(Amount.of(5L, Time.MINUTES));
+
+  @CmdLine(name = "preemption_slot_search_interval",
+      help = "Time interval between pending task preemption slot searches.")
+  private static final Arg<Amount<Long, Time>> PREEMPTION_SLOT_SEARCH_INTERVAL =
+      Arg.create(Amount.of(1L, Time.MINUTES));
+
+  private final boolean enablePreemptor;
+  private final Amount<Long, Time> preemptionDelay;
+  private final Amount<Long, Time> slotSearchInterval;
+
+  @VisibleForTesting
+  public PreemptorModule(
+      boolean enablePreemptor,
+      Amount<Long, Time> preemptionDelay,
+      Amount<Long, Time> slotSearchInterval) {
+
+    this.enablePreemptor = enablePreemptor;
+    this.preemptionDelay = requireNonNull(preemptionDelay);
+    this.slotSearchInterval = requireNonNull(slotSearchInterval);
+  }
+
+  public PreemptorModule() {
+    this(ENABLE_PREEMPTOR.get(), PREEMPTION_DELAY.get(), PREEMPTION_SLOT_SEARCH_INTERVAL.get());
+  }
+
+  @Override
+  protected void configure() {
+    install(new PrivateModule() {
+      @Override
+      protected void configure() {
+        if (enablePreemptor) {
+          LOG.info("Preemptor Enabled.");
+          bind(PreemptorMetrics.class).in(Singleton.class);
+          bind(PreemptionVictimFilter.class)
+              .to(PreemptionVictimFilter.PreemptionVictimFilterImpl.class);
+          bind(PreemptionVictimFilter.PreemptionVictimFilterImpl.class).in(Singleton.class);
+          bind(Preemptor.class).to(Preemptor.PreemptorImpl.class);
+          bind(Preemptor.PreemptorImpl.class).in(Singleton.class);
+          bind(new TypeLiteral<Amount<Long, Time>>() { })
+              .annotatedWith(PendingTaskProcessor.PreemptionDelay.class)
+              .toInstance(preemptionDelay);
+          bind(BiCacheSettings.class).toInstance(
+              new BiCacheSettings(PREEMPTION_SLOT_HOLD_TIME.get(), "preemption_slot_cache_size"));
+          bind(new TypeLiteral<BiCache<PreemptionProposal, TaskGroupKey>>() { })
+              .in(Singleton.class);
+          bind(PendingTaskProcessor.class).in(Singleton.class);
+          bind(ClusterState.class).to(ClusterStateImpl.class);
+          bind(ClusterStateImpl.class).in(Singleton.class);
+          expose(ClusterStateImpl.class);
+
+          bind(PreemptorService.class).in(Singleton.class);
+          bind(AbstractScheduledService.Scheduler.class).toInstance(
+              AbstractScheduledService.Scheduler.newFixedRateSchedule(
+                  0L,
+                  slotSearchInterval.getValue(),
+                  slotSearchInterval.getUnit().getTimeUnit()));
+
+          expose(PreemptorService.class);
+          expose(PendingTaskProcessor.class);
+        } else {
+          bind(Preemptor.class).toInstance(NULL_PREEMPTOR);
+          LOG.warning("Preemptor Disabled.");
+        }
+        expose(Preemptor.class);
+      }
+    });
+
+    // We can't do this in the private module due to the known conflict between multibindings
+    // and private modules due to multiple injectors.  We accept the added complexity here to keep
+    // the other bindings private.
+    PubsubEventModule.bindSubscriber(binder(), ClusterStateImpl.class);
+    if (enablePreemptor) {
+      SchedulerServicesModule.addSchedulerActiveServiceBinding(binder())
+          .to(PreemptorService.class);
+    }
+  }
+
+  static class PreemptorService extends AbstractScheduledService {
+    private final PendingTaskProcessor slotFinder;
+    private final Scheduler schedule;
+
+    @Inject
+    PreemptorService(PendingTaskProcessor slotFinder, Scheduler schedule) {
+      this.slotFinder = requireNonNull(slotFinder);
+      this.schedule = requireNonNull(schedule);
+    }
+
+    @Override
+    protected void runOneIteration() {
+      slotFinder.run();
+    }
+
+    @Override
+    protected Scheduler scheduler() {
+      return schedule;
+    }
+  }
+
+  private static final Preemptor NULL_PREEMPTOR = new Preemptor() {
+    @Override
+    public Optional<String> attemptPreemptionFor(
+        IAssignedTask task,
+        AttributeAggregate jobState,
+        Storage.MutableStoreProvider storeProvider) {
+
+      return Optional.absent();
+    }
+  };
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java
new file mode 100644
index 0000000..023e0cf
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.pruning;
+
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Joiner;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Prunes per-job update history on a periodic basis.
+ */
+class JobUpdateHistoryPruner extends AbstractIdleService {
+  private static final Logger LOG = Logger.getLogger(JobUpdateHistoryPruner.class.getName());
+
+  private final Clock clock;
+  private final ScheduledExecutorService executor;
+  private final Storage storage;
+  private final HistoryPrunerSettings settings;
+
+  static class HistoryPrunerSettings {
+    private final Amount<Long, Time> pruneInterval;
+    private final Amount<Long, Time> maxHistorySize;
+    private final int maxUpdatesPerJob;
+
+    HistoryPrunerSettings(
+        Amount<Long, Time> pruneInterval,
+        Amount<Long, Time> maxHistorySize,
+        int maxUpdatesPerJob) {
+
+      this.pruneInterval = requireNonNull(pruneInterval);
+      this.maxHistorySize = requireNonNull(maxHistorySize);
+      this.maxUpdatesPerJob = maxUpdatesPerJob;
+    }
+  }
+
+  @Inject
+  JobUpdateHistoryPruner(
+      Clock clock,
+      ScheduledExecutorService executor,
+      Storage storage,
+      HistoryPrunerSettings settings) {
+
+    this.clock = requireNonNull(clock);
+    this.executor = requireNonNull(executor);
+    this.storage = requireNonNull(storage);
+    this.settings = requireNonNull(settings);
+  }
+
+  @Override
+  protected void startUp() {
+    executor.scheduleAtFixedRate(
+        new Runnable() {
+          @Override
+          public void run() {
+            storage.write(new MutateWork.NoResult.Quiet() {
+              @Override
+              public void execute(MutableStoreProvider storeProvider) {
+                Set<IJobUpdateKey> prunedUpdates = storeProvider.getJobUpdateStore().pruneHistory(
+                    settings.maxUpdatesPerJob,
+                    clock.nowMillis() - settings.maxHistorySize.as(Time.MILLISECONDS));
+
+                LOG.info(prunedUpdates.isEmpty()
+                    ? "No job update history to prune."
+                    : "Pruned job update history: " + Joiner.on(",").join(prunedUpdates));
+              }
+            });
+          }
+        },
+        settings.pruneInterval.as(Time.MILLISECONDS),
+        settings.pruneInterval.as(Time.MILLISECONDS),
+        TimeUnit.MILLISECONDS);
+  }
+
+  @Override
+  protected void shutDown() {
+    // Nothing to do - await VM shutdown.
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java b/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java
new file mode 100644
index 0000000..373c833
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.pruning;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.logging.Logger;
+
+import javax.inject.Singleton;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.PrivateModule;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+
+import org.apache.aurora.scheduler.SchedulerServicesModule;
+import org.apache.aurora.scheduler.base.AsyncUtil;
+import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.apache.aurora.scheduler.pruning.TaskHistoryPruner.HistoryPrunnerSettings;
+
+/**
+ * Binding module for background storage pruning.
+ */
+public class PruningModule extends AbstractModule {
+
+  private static final Logger LOG = Logger.getLogger(PruningModule.class.getName());
+
+  @CmdLine(name = "history_prune_threshold",
+      help = "Time after which the scheduler will prune terminated task history.")
+  private static final Arg<Amount<Long, Time>> HISTORY_PRUNE_THRESHOLD =
+      Arg.create(Amount.of(2L, Time.DAYS));
+
+  @CmdLine(name = "history_max_per_job_threshold",
+      help = "Maximum number of terminated tasks to retain in a job history.")
+  private static final Arg<Integer> HISTORY_MAX_PER_JOB_THRESHOLD = Arg.create(100);
+
+  @CmdLine(name = "history_min_retention_threshold",
+      help = "Minimum guaranteed time for task history retention before any pruning is attempted.")
+  private static final Arg<Amount<Long, Time>> HISTORY_MIN_RETENTION_THRESHOLD =
+      Arg.create(Amount.of(1L, Time.HOURS));
+
+  @CmdLine(name = "job_update_history_per_job_threshold",
+      help = "Maximum number of completed job updates to retain in a job update history.")
+  private static final Arg<Integer> JOB_UPDATE_HISTORY_PER_JOB_THRESHOLD = Arg.create(10);
+
+  @CmdLine(name = "job_update_history_pruning_interval",
+      help = "Job update history pruning interval.")
+  private static final Arg<Amount<Long, Time>> JOB_UPDATE_HISTORY_PRUNING_INTERVAL =
+      Arg.create(Amount.of(15L, Time.MINUTES));
+
+  @CmdLine(name = "job_update_history_pruning_threshold",
+      help = "Time after which the scheduler will prune completed job update history.")
+  private static final Arg<Amount<Long, Time>> JOB_UPDATE_HISTORY_PRUNING_THRESHOLD =
+      Arg.create(Amount.of(30L, Time.DAYS));
+
+  @Override
+  protected void configure() {
+    install(new PrivateModule() {
+      @Override
+      protected void configure() {
+        // TODO(ksweeney): Create a configuration validator module so this can be injected.
+        // TODO(William Farner): Revert this once large task counts is cheap ala hierarchichal store
+        bind(HistoryPrunnerSettings.class).toInstance(new HistoryPrunnerSettings(
+            HISTORY_PRUNE_THRESHOLD.get(),
+            HISTORY_MIN_RETENTION_THRESHOLD.get(),
+            HISTORY_MAX_PER_JOB_THRESHOLD.get()
+        ));
+
+        bind(TaskHistoryPruner.class).in(Singleton.class);
+        expose(TaskHistoryPruner.class);
+      }
+    });
+    PubsubEventModule.bindSubscriber(binder(), TaskHistoryPruner.class);
+
+    install(new PrivateModule() {
+      @Override
+      protected void configure() {
+        bind(JobUpdateHistoryPruner.HistoryPrunerSettings.class).toInstance(
+            new JobUpdateHistoryPruner.HistoryPrunerSettings(
+                JOB_UPDATE_HISTORY_PRUNING_INTERVAL.get(),
+                JOB_UPDATE_HISTORY_PRUNING_THRESHOLD.get(),
+                JOB_UPDATE_HISTORY_PER_JOB_THRESHOLD.get()));
+
+        bind(ScheduledExecutorService.class).toInstance(
+            AsyncUtil.singleThreadLoggingScheduledExecutor("JobUpdatePruner-%d", LOG));
+
+        bind(JobUpdateHistoryPruner.class).in(Singleton.class);
+        expose(JobUpdateHistoryPruner.class);
+      }
+    });
+    SchedulerServicesModule.addSchedulerActiveServiceBinding(binder())
+        .to(JobUpdateHistoryPruner.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
new file mode 100644
index 0000000..ef88d98
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.pruning;
+
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.eventbus.Subscribe;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.gen.apiConstants;
+import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+
+/**
+ * Prunes tasks in a job based on per-job history and an inactive time threshold by observing tasks
+ * transitioning into one of the inactive states.
+ */
+public class TaskHistoryPruner implements EventSubscriber {
+  private static final Logger LOG = Logger.getLogger(TaskHistoryPruner.class.getName());
+
+  private final ScheduledExecutorService executor;
+  private final StateManager stateManager;
+  private final Clock clock;
+  private final HistoryPrunnerSettings settings;
+  private final Storage storage;
+
+  private final Predicate<IScheduledTask> safeToDelete = new Predicate<IScheduledTask>() {
+    @Override
+    public boolean apply(IScheduledTask task) {
+      return Tasks.getLatestEvent(task).getTimestamp()
+          <= clock.nowMillis() - settings.minRetentionThresholdMillis;
+    }
+  };
+
+  static class HistoryPrunnerSettings {
+    private final long pruneThresholdMillis;
+    private final long minRetentionThresholdMillis;
+    private final int perJobHistoryGoal;
+
+    HistoryPrunnerSettings(
+        Amount<Long, Time> inactivePruneThreshold,
+        Amount<Long, Time> minRetentionThreshold,
+        int perJobHistoryGoal) {
+
+      this.pruneThresholdMillis = inactivePruneThreshold.as(Time.MILLISECONDS);
+      this.minRetentionThresholdMillis = minRetentionThreshold.as(Time.MILLISECONDS);
+      this.perJobHistoryGoal = perJobHistoryGoal;
+    }
+  }
+
+  @Inject
+  TaskHistoryPruner(
+      @AsyncExecutor ScheduledExecutorService executor,
+      StateManager stateManager,
+      Clock clock,
+      HistoryPrunnerSettings settings,
+      Storage storage) {
+
+    this.executor = requireNonNull(executor);
+    this.stateManager = requireNonNull(stateManager);
+    this.clock = requireNonNull(clock);
+    this.settings = requireNonNull(settings);
+    this.storage = requireNonNull(storage);
+  }
+
+  @VisibleForTesting
+  long calculateTimeout(long taskEventTimestampMillis) {
+    return Math.max(
+        settings.minRetentionThresholdMillis,
+        settings.pruneThresholdMillis - Math.max(0, clock.nowMillis() - taskEventTimestampMillis));
+  }
+
+  /**
+   * When triggered, records an inactive task state change.
+   *
+   * @param change Event when a task changes state.
+   */
+  @Subscribe
+  public void recordStateChange(TaskStateChange change) {
+    if (Tasks.isTerminated(change.getNewState())) {
+      long timeoutBasis = change.isTransition()
+          ? clock.nowMillis()
+          : Iterables.getLast(change.getTask().getTaskEvents()).getTimestamp();
+      registerInactiveTask(
+          Tasks.SCHEDULED_TO_JOB_KEY.apply(change.getTask()),
+          change.getTaskId(),
+          calculateTimeout(timeoutBasis));
+    }
+  }
+
+  private void deleteTasks(final Set<String> taskIds) {
+    LOG.info("Pruning inactive tasks " + taskIds);
+    storage.write(new Storage.MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(Storage.MutableStoreProvider storeProvider) {
+        stateManager.deleteTasks(storeProvider, taskIds);
+      }
+    });
+  }
+
+  @VisibleForTesting
+  static Query.Builder jobHistoryQuery(IJobKey jobKey) {
+    return Query.jobScoped(jobKey).byStatus(apiConstants.TERMINAL_STATES);
+  }
+
+  private void registerInactiveTask(
+      final IJobKey jobKey,
+      final String taskId,
+      long timeRemaining) {
+
+    LOG.fine("Prune task " + taskId + " in " + timeRemaining + " ms.");
+    executor.schedule(
+        new Runnable() {
+          @Override
+          public void run() {
+            LOG.info("Pruning expired inactive task " + taskId);
+            deleteTasks(ImmutableSet.of(taskId));
+          }
+        },
+        timeRemaining,
+        TimeUnit.MILLISECONDS);
+
+    executor.submit(new Runnable() {
+      @Override
+      public void run() {
+        Iterable<IScheduledTask> inactiveTasks =
+            Storage.Util.fetchTasks(storage, jobHistoryQuery(jobKey));
+        int numInactiveTasks = Iterables.size(inactiveTasks);
+        int tasksToPrune = numInactiveTasks - settings.perJobHistoryGoal;
+        if (tasksToPrune > 0 && numInactiveTasks > settings.perJobHistoryGoal) {
+          Set<String> toPrune = FluentIterable
+              .from(Tasks.LATEST_ACTIVITY.sortedCopy(inactiveTasks))
+              .filter(safeToDelete)
+              .limit(tasksToPrune)
+              .transform(Tasks.SCHEDULED_TO_ID)
+              .toSet();
+          deleteTasks(toPrune);
+        }
+      }
+    });
+  }
+}


Mime
View raw message