aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [33/51] [partial] Rename twitter* and com.twitter to apache and org.apache directories to preserve all file history before the refactor.
Date Tue, 31 Dec 2013 21:20:26 GMT
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/log/Log.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/log/Log.java b/src/main/java/org/apache/aurora/scheduler/log/Log.java
new file mode 100644
index 0000000..c1036e0
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/log/Log.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.log;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Represents an append only log that can be read after and truncated before a known
+ * {@link Position}.
+ *
+ * <p>Logs are accessed by {@link #open() opening} a {@link Stream}.  All stream
+ * access occurs with references to log entry {@link Position positions} in the stream.
+ */
+public interface Log {
+
+  /**
+   * An opaque ordered handle to a log entry's position in the log stream.
+   */
+  interface Position extends Comparable<Position> {
+  }
+
+  /**
+   * Represents a single entry in the log stream.
+   */
+  interface Entry {
+
+    /**
+     * Gets the contents of the log entry.
+     *
+     * @return the data stored in this log entry
+     */
+    byte[] contents();
+  }
+
+  /**
+   * An interface to the live {@link Log} stream that allows for appending, reading and writing
+   * entries.
+   */
+  interface Stream extends Closeable {
+
+    /**
+     * Indicates a {@link Position} that is not (currently) contained in this log stream.
+     * This might indicate the position id from a different log or that the position was from this
+     * log but has been truncated.
+     */
+    class InvalidPositionException extends RuntimeException {
+      public InvalidPositionException(String message) {
+        super(message);
+      }
+      public InvalidPositionException(String message, Throwable cause) {
+        super(message, cause);
+      }
+    }
+
+    /**
+     * Indicates a {@link Stream} could not be read from, written to or truncated due to some
+     * underlying IO error.
+     */
+    class StreamAccessException extends RuntimeException {
+      public StreamAccessException(String message, Throwable cause) {
+        super(message, cause);
+      }
+    }
+
+    /**
+     * Appends an {@link Entry} to the end of the log stream.
+     *
+     * @param contents the data to store in the appended entry
+     * @return the posiiton of the appended entry
+     * @throws StreamAccessException if contents could not be appended to the stream
+     */
+    Position append(byte[] contents) throws StreamAccessException;
+
+    /**
+     * Allows reading all entries from the log stream.  Implementations may
+     * materialize all entries at once or the may provide some form of streaming to back the
+     * returned entry iterator.  If the implementation does use some form of streaming or batching,
+     * it may throw a
+     * {@code StreamAccessException} on any call to {@link Iterator#hasNext()} or
+     * {@link Iterator#next()}.
+     *
+     * @return an iterator that ranges from the entry from the given {@code position} to the last
+     *     entry in the log.
+     * @throws InvalidPositionException if the specified position does not exist in this log
+     * @throws StreamAccessException if the stream could not be read from
+     */
+    Iterator<Entry> readAll() throws InvalidPositionException, StreamAccessException;
+
+    /**
+     * Removes all log entries preceding the log entry at the given {@code position}.
+     *
+     * @param position the position of the latest entry to remove
+     * @throws InvalidPositionException if the specified position does not exist in this log
+     * @throws StreamAccessException if the stream could not be truncated
+     */
+    void truncateBefore(Position position) throws InvalidPositionException, StreamAccessException;
+  }
+
+  /**
+   * Opens the log stream for reading writing and truncation.  Clients should ensure the stream is
+   * closed when they are done using it.
+   *
+   * @return the log stream
+   * @throws IOException if there was a problem opening the log stream
+   */
+  Stream open() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/log/mesos/LogInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/log/mesos/LogInterface.java b/src/main/java/org/apache/aurora/scheduler/log/mesos/LogInterface.java
new file mode 100644
index 0000000..9ae4685
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/log/mesos/LogInterface.java
@@ -0,0 +1,42 @@
+package com.twitter.aurora.scheduler.log.mesos;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.mesos.Log;
+import org.apache.mesos.Log.Entry;
+import org.apache.mesos.Log.Position;
+
+/**
+ * An interface for {@link Log}, since the mesos Java API doesn't provide one.
+ * <p>
+ * This is needed because a static initializer that loads a native library, prevents us from
+ * mocking the Mesos Log API in tests. These wrapper interfaces and their corresponding
+ * implementations that delegate the calls to the underlying Mesos Log objects will help
+ * us mock the Log API.
+ * <p>
+ * TODO(Suman Karumuri): Remove this interface after https://issues.apache.org/jira/browse/MESOS-796
+ * is resolved.
+ */
+interface LogInterface {
+
+  Position position(byte[] identity);
+
+  interface ReaderInterface {
+    List<Entry> read(Position from, Position to, long timeout, TimeUnit unit)
+        throws TimeoutException, Log.OperationFailedException;
+
+    Position beginning();
+
+    Position ending();
+  }
+
+  interface WriterInterface {
+    Position append(byte[] data, long timeout, TimeUnit unit)
+        throws TimeoutException, Log.WriterFailedException;
+
+    Position truncate(Position to, long timeout, TimeUnit unit)
+        throws TimeoutException, Log.WriterFailedException;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLog.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLog.java b/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLog.java
new file mode 100644
index 0000000..d625734
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLog.java
@@ -0,0 +1,379 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.log.mesos;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+import javax.inject.Provider;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.UnmodifiableIterator;
+import com.google.common.primitives.Longs;
+import com.google.inject.BindingAnnotation;
+
+import org.apache.mesos.Log;
+
+import com.twitter.aurora.scheduler.log.mesos.LogInterface.ReaderInterface;
+import com.twitter.aurora.scheduler.log.mesos.LogInterface.WriterInterface;
+import com.twitter.common.base.Function;
+import com.twitter.common.base.MorePreconditions;
+import com.twitter.common.inject.TimedInterceptor.Timed;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.SlidingStats;
+import com.twitter.common.stats.Stats;
+
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+/**
+ * A {@code Log} implementation backed by a true distributed log in mesos core.
+ */
+public class MesosLog implements com.twitter.aurora.scheduler.log.Log {
+
+  private static final Logger LOG = Logger.getLogger(MesosLog.class.getName());
+
+  /**
+   * Binding annotation for the opaque value of a log noop entry.
+   */
+  @BindingAnnotation
+  @Retention(RUNTIME)
+  @Target({ PARAMETER, METHOD })
+  public @interface NoopEntry { }
+
+  /**
+   * Binding annotation for log read timeouts.
+   */
+  @BindingAnnotation
+  @Retention(RUNTIME)
+  @Target({ PARAMETER, METHOD })
+  public @interface ReadTimeout { }
+
+  /**
+   * Binding annotation for log write timeouts - used for truncates and appends.
+   */
+  @BindingAnnotation
+  @Retention(RUNTIME)
+  @Target({ PARAMETER, METHOD })
+  public @interface WriteTimeout { }
+
+  private final Provider<LogInterface> logFactory;
+
+  private final Provider<ReaderInterface> readerFactory;
+  private final Amount<Long, Time> readTimeout;
+
+  private final Provider<WriterInterface> writerFactory;
+  private final Amount<Long, Time> writeTimeout;
+
+  private final byte[] noopEntry;
+
+  /**
+   * Creates a new mesos log.
+   *
+   * @param logFactory Factory to provide access to log.
+   * @param readerFactory Factory to provide access to log readers.
+   * @param readTimeout Log read timeout.
+   * @param writerFactory Factory to provide access to log writers.
+   * @param writeTimeout Log write timeout.
+   * @param noopEntry A no-op log entry blob.
+   */
+  @Inject
+  public MesosLog(
+      Provider<LogInterface> logFactory,
+      Provider<ReaderInterface> readerFactory,
+      @ReadTimeout Amount<Long, Time> readTimeout,
+      Provider<WriterInterface> writerFactory,
+      @WriteTimeout Amount<Long, Time> writeTimeout,
+      @NoopEntry byte[] noopEntry) {
+
+    this.logFactory = Preconditions.checkNotNull(logFactory);
+
+    this.readerFactory = Preconditions.checkNotNull(readerFactory);
+    this.readTimeout = readTimeout;
+
+    this.writerFactory = Preconditions.checkNotNull(writerFactory);
+    this.writeTimeout = writeTimeout;
+
+    this.noopEntry = Preconditions.checkNotNull(noopEntry);
+  }
+
+  @Override
+  public Stream open() {
+    return new LogStream(
+        logFactory.get(), readerFactory.get(), readTimeout, writerFactory, writeTimeout, noopEntry);
+  }
+
+  @VisibleForTesting
+  static class LogStream implements com.twitter.aurora.scheduler.log.Log.Stream {
+    @VisibleForTesting
+    static final class OpStats {
+      private final String opName;
+      private final SlidingStats timing;
+      private final AtomicLong timeouts;
+      private final AtomicLong failures;
+
+      OpStats(String opName) {
+        this.opName = MorePreconditions.checkNotBlank(opName);
+        timing = new SlidingStats("scheduler_log_native_" + opName, "nanos");
+        timeouts = exportLongStat("scheduler_log_native_%s_timeouts", opName);
+        failures = exportLongStat("scheduler_log_native_%s_failures", opName);
+      }
+
+      private static AtomicLong exportLongStat(String template, Object... args) {
+        return Stats.exportLong(String.format(template, args));
+      }
+    }
+
+    private static final Function<Log.Entry, LogEntry> MESOS_ENTRY_TO_ENTRY =
+        new Function<Log.Entry, LogEntry>() {
+          @Override public LogEntry apply(Log.Entry entry) {
+            return new LogEntry(entry);
+          }
+        };
+
+    private final OpStats read = new OpStats("read");
+    private final OpStats append = new OpStats("append");
+    private final OpStats truncate = new OpStats("truncate");
+    private final AtomicLong entriesSkipped =
+        Stats.exportLong("scheduler_log_native_native_entries_skipped");
+
+    private final LogInterface log;
+
+    private final ReaderInterface reader;
+    private final long readTimeout;
+    private final TimeUnit readTimeUnit;
+
+    private final Provider<WriterInterface> writerFactory;
+    private final long writeTimeout;
+    private final TimeUnit writeTimeUnit;
+
+    private final byte[] noopEntry;
+
+    private WriterInterface writer;
+
+    LogStream(LogInterface log, ReaderInterface reader, Amount<Long, Time> readTimeout,
+        Provider<WriterInterface> writerFactory, Amount<Long, Time> writeTimeout,
+        byte[] noopEntry) {
+
+      this.log = log;
+
+      this.reader = reader;
+      this.readTimeout = readTimeout.getValue();
+      this.readTimeUnit = readTimeout.getUnit().getTimeUnit();
+
+      this.writerFactory = writerFactory;
+      this.writeTimeout = writeTimeout.getValue();
+      this.writeTimeUnit = writeTimeout.getUnit().getTimeUnit();
+
+      this.noopEntry = noopEntry;
+    }
+
+    @Override
+    public Iterator<Entry> readAll() throws StreamAccessException {
+      // TODO(John Sirois): Currently we must be the coordinator to ensure we get the 'full read'
+      // of log entries expected by the users of the com.twitter.aurora.scheduler.log.Log interface.
+      // Switch to another method of ensuring this when it becomes available in mesos' log
+      // interface.
+      try {
+        append(noopEntry);
+      } catch (StreamAccessException e) {
+        throw new StreamAccessException("Error writing noop prior to a read", e);
+      }
+
+      final Log.Position from = reader.beginning();
+      final Log.Position to = end().unwrap();
+
+      // Reading all the entries at once may cause large garbage collections. Instead, we
+      // lazily read the entries one by one as they are requested.
+      // TODO(Benjamin Hindman): Eventually replace this functionality with functionality
+      // from the Mesos Log.
+      return new UnmodifiableIterator<Entry>() {
+        private long position = Longs.fromByteArray(from.identity());
+        private final long endPosition = Longs.fromByteArray(to.identity());
+        private Entry entry = null;
+
+        @Override
+        public boolean hasNext() {
+          if (entry != null) {
+            return true;
+          }
+
+          while (position <= endPosition) {
+            long start = System.nanoTime();
+            try {
+              Log.Position p = log.position(Longs.toByteArray(position));
+              if (LOG.isLoggable(Level.FINE)) {
+                LOG.fine("Reading position " + position + " from the log");
+              }
+              List<Log.Entry> entries = reader.read(p, p, readTimeout, readTimeUnit);
+
+              // N.B. HACK! There is currently no way to "increment" a position. Until the Mesos
+              // Log actually provides a way to "stream" the log, we approximate as much by
+              // using longs via Log.Position.identity and Log.position.
+              position++;
+
+              // Reading positions in this way means it's possible that we get an "invalid" entry
+              // (e.g., in the underlying log terminology this would be anything but an append)
+              // which will be removed from the returned entries resulting in an empty list.
+              // We skip these.
+              if (entries.isEmpty()) {
+                entriesSkipped.getAndIncrement();
+              } else {
+                entry = MESOS_ENTRY_TO_ENTRY.apply(Iterables.getOnlyElement(entries));
+                return true;
+              }
+            } catch (TimeoutException e) {
+              read.timeouts.getAndIncrement();
+              throw new StreamAccessException("Timeout reading from log.", e);
+            } catch (Log.OperationFailedException e) {
+              read.failures.getAndIncrement();
+              throw new StreamAccessException("Problem reading from log", e);
+            } finally {
+              read.timing.accumulate(System.nanoTime() - start);
+            }
+          }
+          return false;
+        }
+
+        @Override
+        public Entry next() {
+          if (entry == null && !hasNext()) {
+            throw new NoSuchElementException();
+          }
+
+          Entry result = Preconditions.checkNotNull(entry);
+          entry = null;
+          return result;
+        }
+      };
+    }
+
+    @Override
+    public LogPosition append(final byte[] contents) throws StreamAccessException {
+      Preconditions.checkNotNull(contents);
+
+      Log.Position position = mutate(append, new Mutation<Log.Position>() {
+        @Override public Log.Position apply(WriterInterface logWriter)
+            throws TimeoutException, Log.WriterFailedException {
+          return logWriter.append(contents, writeTimeout, writeTimeUnit);
+        }
+      });
+      return LogPosition.wrap(position);
+    }
+
+    @Timed("scheduler_log_native_truncate_before")
+    @Override
+    public void truncateBefore(com.twitter.aurora.scheduler.log.Log.Position position)
+        throws StreamAccessException {
+
+      Preconditions.checkArgument(position instanceof LogPosition);
+
+      final Log.Position before = ((LogPosition) position).unwrap();
+      mutate(truncate, new Mutation<Void>() {
+        @Override public Void apply(WriterInterface logWriter)
+            throws TimeoutException, Log.WriterFailedException {
+          logWriter.truncate(before, writeTimeout, writeTimeUnit);
+          return null;
+        }
+      });
+    }
+
+    @VisibleForTesting
+    interface Mutation<T> {
+      T apply(WriterInterface writer) throws TimeoutException, Log.WriterFailedException;
+    }
+
+    @VisibleForTesting
+    synchronized <T> T mutate(OpStats stats, Mutation<T> mutation) {
+      long start = System.nanoTime();
+      if (writer == null) {
+        writer = writerFactory.get();
+      }
+      try {
+        return mutation.apply(writer);
+      } catch (TimeoutException e) {
+        stats.timeouts.getAndIncrement();
+        throw new StreamAccessException("Timeout performing log " + stats.opName, e);
+      } catch (Log.WriterFailedException e) {
+        stats.failures.getAndIncrement();
+
+        // We must throw away a writer on any write failure - this could be because of a coordinator
+        // election in which case we must trigger a new election.
+        writer = null;
+
+        throw new StreamAccessException("Problem performing log" + stats.opName, e);
+      } finally {
+        stats.timing.accumulate(System.nanoTime() - start);
+      }
+    }
+
+    private LogPosition end() {
+      return LogPosition.wrap(reader.ending());
+    }
+
+    @Override
+    public void close() {
+      // noop
+    }
+
+    private static class LogPosition implements com.twitter.aurora.scheduler.log.Log.Position {
+      private final Log.Position underlying;
+
+      LogPosition(Log.Position underlying) {
+        this.underlying = underlying;
+      }
+
+      static LogPosition wrap(Log.Position position) {
+        return new LogPosition(position);
+      }
+
+      Log.Position unwrap() {
+        return underlying;
+      }
+
+      @Override public int compareTo(Position o) {
+        Preconditions.checkArgument(o instanceof LogPosition);
+        return underlying.compareTo(((LogPosition) o).underlying);
+      }
+    }
+
+    private static class LogEntry implements com.twitter.aurora.scheduler.log.Log.Entry {
+      private final Log.Entry underlying;
+
+      public LogEntry(Log.Entry entry) {
+        this.underlying = entry;
+      }
+
+      @Override
+      public byte[] contents() {
+        return underlying.data;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java b/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java
new file mode 100644
index 0000000..7063cb7
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java
@@ -0,0 +1,203 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.log.mesos;
+
+import java.io.File;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.inject.Singleton;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.inject.PrivateModule;
+import com.google.inject.Provides;
+import com.google.inject.TypeLiteral;
+
+import org.apache.mesos.Log;
+import org.apache.zookeeper.common.PathUtils;
+
+import com.twitter.aurora.codec.ThriftBinaryCodec;
+import com.twitter.aurora.gen.storage.LogEntry;
+import com.twitter.aurora.scheduler.log.mesos.LogInterface.ReaderInterface;
+import com.twitter.aurora.scheduler.log.mesos.LogInterface.WriterInterface;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.net.InetSocketAddressHelper;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.zookeeper.guice.client.ZooKeeperClientModule.ClientConfig;
+
+/**
+ * Binds a native mesos Log implementation.
+ *
+ * <p>Exports the following bindings:
+ * <ul>
+ *   <li>{@link Log} - a log backed by the mesos native distributed log</li>
+ * </ul>
+ */
+public class MesosLogStreamModule extends PrivateModule {
+  @CmdLine(name = "native_log_quorum_size",
+           help = "The size of the quorum required for all log mutations.")
+  private static final Arg<Integer> QUORUM_SIZE = Arg.create(1);
+
+  @CmdLine(name = "native_log_file_path",
+           help = "Path to a file to store the native log data in.  If the parent directory does"
+               + "not exist it will be created.")
+  private static final Arg<File> LOG_PATH = Arg.create(null);
+
+  @CmdLine(name = "native_log_zk_group_path",
+           help = "A zookeeper node for use by the native log to track the master coordinator.")
+  private static final Arg<String> ZK_LOG_GROUP_PATH = Arg.create(null);
+
+  /*
+   * This timeout includes the time to get a quorum to promise leadership to the coordinator and
+   * the time to fill any holes in the coordinator's log.
+   */
+  @CmdLine(name = "native_log_election_timeout",
+           help = "The timeout for a single attempt to obtain a new log writer.")
+  private static final Arg<Amount<Long, Time>> COORDINATOR_ELECTION_TIMEOUT =
+      Arg.create(Amount.of(15L, Time.SECONDS));
+
+  /*
+   * Normally retries would not be expected to help much - however in the small replica set where
+   * a few down replicas doom a coordinator election attempt, retrying effectively gives us a wider
+   * window in which to await a live quorum before giving up and thrashing the global election
+   * process.  Observed log replica recovery times as of 4/6/2012 can be ~45 seconds so giving a
+   * window >= 2x this should support 1-round election events (that possibly use several retries in
+   * the single round).
+   */
+  @CmdLine(name = "native_log_election_retries",
+           help = "The maximum number of attempts to obtain a new log writer.")
+  private static final Arg<Integer> COORDINATOR_ELECTION_RETRIES = Arg.create(20);
+
+  @CmdLine(name = "native_log_read_timeout",
+           help = "The timeout for doing log reads.")
+  private static final Arg<Amount<Long, Time>> READ_TIMEOUT =
+      Arg.create(Amount.of(5L, Time.SECONDS));
+
+  @CmdLine(name = "native_log_write_timeout",
+           help = "The timeout for doing log appends and truncations.")
+  private static final Arg<Amount<Long, Time>> WRITE_TIMEOUT =
+      Arg.create(Amount.of(3L, Time.SECONDS));
+
+  private final ClientConfig zkClientConfig;
+
+  public MesosLogStreamModule(ClientConfig zkClientConfig) {
+    this.zkClientConfig = Preconditions.checkNotNull(zkClientConfig);
+  }
+
+  @Override
+  protected void configure() {
+    bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(MesosLog.ReadTimeout.class)
+        .toInstance(READ_TIMEOUT.get());
+    bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(MesosLog.WriteTimeout.class)
+        .toInstance(WRITE_TIMEOUT.get());
+
+    bind(com.twitter.aurora.scheduler.log.Log.class).to(MesosLog.class);
+    bind(MesosLog.class).in(Singleton.class);
+    expose(com.twitter.aurora.scheduler.log.Log.class);
+  }
+
+  @Provides
+  @Singleton
+  Log provideLog() {
+    File logPath = LOG_PATH.get();
+    File parentDir = logPath.getParentFile();
+    if (!parentDir.exists() && !parentDir.mkdirs()) {
+      addError("Failed to create parent directory to store native log at: %s", parentDir);
+    }
+
+    String zkConnectString = Joiner.on(',').join(
+        Iterables.transform(zkClientConfig.servers, InetSocketAddressHelper.INET_TO_STR));
+
+    PathUtils.validatePath(ZK_LOG_GROUP_PATH.get());
+    return new Log(
+        QUORUM_SIZE.get(),
+        logPath.getAbsolutePath(),
+        zkConnectString,
+        zkClientConfig.sessionTimeout.getValue(),
+        zkClientConfig.sessionTimeout.getUnit().getTimeUnit(),
+        ZK_LOG_GROUP_PATH.get(),
+        zkClientConfig.credentials.scheme(),
+        zkClientConfig.credentials.authToken());
+  }
+
+  @Provides
+  Log.Reader provideReader(Log log) {
+    return new Log.Reader(log);
+  }
+
+  @Provides
+  Log.Writer provideWriter(Log log) {
+    Amount<Long, Time> electionTimeout = COORDINATOR_ELECTION_TIMEOUT.get();
+    return new Log.Writer(log, electionTimeout.getValue(), electionTimeout.getUnit().getTimeUnit(),
+        COORDINATOR_ELECTION_RETRIES.get());
+  }
+
+  @Provides
+  LogInterface provideLogInterface(final Log log) {
+    return new LogInterface() {
+      @Override public Log.Position position(byte[] identity) {
+        return log.position(identity);
+      }
+    };
+  }
+
+  @Provides
+  ReaderInterface provideReaderInterface(final Log.Reader reader) {
+    return new ReaderInterface() {
+      @Override
+      public List<Log.Entry> read(Log.Position from, Log.Position to, long timeout, TimeUnit unit)
+          throws TimeoutException, Log.OperationFailedException {
+
+        return reader.read(from, to, timeout, unit);
+      }
+
+      @Override public Log.Position beginning() {
+        return reader.beginning();
+      }
+
+      @Override public Log.Position ending() {
+        return reader.ending();
+      }
+    };
+  }
+
+  @Provides
+  WriterInterface provideWriterInterface(final Log.Writer writer) {
+    return new WriterInterface() {
+      @Override public Log.Position append(byte[] data, long timeout, TimeUnit unit)
+          throws TimeoutException, Log.WriterFailedException {
+        return writer.append(data, timeout, unit);
+      }
+
+      @Override public Log.Position truncate(Log.Position to, long timeout, TimeUnit unit)
+          throws TimeoutException, Log.WriterFailedException {
+        return writer.truncate(to, timeout, unit);
+      }
+    };
+  }
+
+  @Provides
+  @Singleton
+  @MesosLog.NoopEntry
+  byte[] provideNoopEntry() throws ThriftBinaryCodec.CodingException {
+    return ThriftBinaryCodec.encodeNonNull(LogEntry.noop(true));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/log/testing/FileLog.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/log/testing/FileLog.java b/src/main/java/org/apache/aurora/scheduler/log/testing/FileLog.java
new file mode 100644
index 0000000..3dff4cf
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/log/testing/FileLog.java
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.log.testing;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Iterator;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Throwables;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
+import com.google.common.io.Files;
+import com.google.common.primitives.Longs;
+
+import com.twitter.aurora.codec.ThriftBinaryCodec;
+import com.twitter.aurora.codec.ThriftBinaryCodec.CodingException;
+import com.twitter.aurora.gen.test.FileLogContents;
+import com.twitter.aurora.gen.test.LogRecord;
+import com.twitter.aurora.scheduler.log.Log;
+import com.twitter.common.base.Closure;
+
+/**
+ * A log implementation that reads from and writes to a local file.
+ * <p>
+ * This should never be used in a production setting, it is only intended for local testing.
+ * TODO(wfarner): Bind/inject a settable flag that indicates we are running with test settings.
+ * Surface this in a banner on the web UI.
+ */
+class FileLog implements Log {
+
+  private final File logFile;
+
+  @Inject
+  FileLog(File logFile) {
+    this.logFile = Preconditions.checkNotNull(logFile);
+  }
+
+  @Override
+  public Stream open() throws IOException {
+    try {
+      FileLogContents logContents;
+      // Treat an empty file as a new file.
+      // NOTE: we can't use logFile.length == 0 to test for an empty file, since empty unicode files
+      // contain a byte order mark (BOM) header that is 255 bytes in length.
+      if (logFile.createNewFile()
+          || Files.readFirstLine(logFile, Charset.defaultCharset()) == null) {
+        logContents = new FileLogContents(Maps.<Long, LogRecord>newHashMap());
+      } else {
+        logContents = ThriftBinaryCodec.decode(FileLogContents.class, Files.toByteArray(logFile));
+      }
+      Closure<FileLogContents> logWriter = new Closure<FileLogContents>() {
+        @Override public void execute(FileLogContents logContents) {
+          try {
+            Files.write(ThriftBinaryCodec.encode(logContents), logFile);
+          } catch (IOException | CodingException e) {
+            throw Throwables.propagate(e);
+          }
+        }
+      };
+      return new FileStream(logContents, logWriter);
+    } catch (CodingException e) {
+      throw new IOException("Failed to interpret log contents: " + e, e);
+    }
+  }
+
+  private static class FileStream implements Stream {
+    private final FileLogContents logContents;
+    private final Closure<FileLogContents> logWriter;
+    private long nextPosition;
+
+    FileStream(FileLogContents logContents, Closure<FileLogContents> logWriter) {
+      this.logContents = logContents;
+      this.logWriter = logWriter;
+      nextPosition = logContents.getRecords().isEmpty()
+          ? 1
+          : Ordering.natural().max(logContents.getRecords().keySet()) + 1;
+    }
+
+    @Override
+    public Position append(byte[] contents) throws StreamAccessException {
+      logContents.getRecords().put(nextPosition, new LogRecord(ByteBuffer.wrap(contents)));
+      Position position = new CounterPosition(nextPosition);
+      logWriter.execute(logContents);
+      nextPosition++;
+      return position;
+    }
+
+    private static final Function<LogRecord, Entry> TO_ENTRY = new Function<LogRecord, Entry>() {
+      @Override public Entry apply(final LogRecord record) {
+        return new Entry() {
+          @Override public byte[] contents() {
+            return record.getContents();
+          }
+        };
+      }
+    };
+
+    @Override
+    public Iterator<Entry> readAll() throws InvalidPositionException, StreamAccessException {
+      return FluentIterable.from(Ordering.natural().sortedCopy(logContents.getRecords().keySet()))
+          .transform(Functions.forMap(logContents.getRecords()))
+          .transform(TO_ENTRY)
+          .iterator();
+    }
+
+    @Override
+    public void truncateBefore(Position position)
+        throws InvalidPositionException, StreamAccessException {
+
+      if (!(position instanceof CounterPosition)) {
+        throw new InvalidPositionException("Unrecognized position " + position);
+      }
+
+      final long truncateBefore = ((CounterPosition) position).value;
+      Iterables.removeIf(logContents.getRecords().keySet(), new Predicate<Long>() {
+        @Override public boolean apply(Long recordPosition) {
+          return recordPosition < truncateBefore;
+        }
+      });
+      logWriter.execute(logContents);
+    }
+
+    @Override
+    public void close() throws IOException {
+      // No-op.
+    }
+
+    private static class CounterPosition implements Position {
+      private final long value;
+
+      CounterPosition(long value) {
+        this.value = value;
+      }
+
+      @Override
+      public int compareTo(Position position) {
+        return Longs.compare(value, ((CounterPosition) position).value);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/log/testing/FileLogStreamModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/log/testing/FileLogStreamModule.java b/src/main/java/org/apache/aurora/scheduler/log/testing/FileLogStreamModule.java
new file mode 100644
index 0000000..5ef3e8e
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/log/testing/FileLogStreamModule.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.log.testing;
+
+import java.io.File;
+
+import javax.inject.Singleton;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.PrivateModule;
+
+import com.twitter.aurora.scheduler.log.Log;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+
+/**
+ * Binding module that uses a local log file, intended for testing.
+ */
+public class FileLogStreamModule extends PrivateModule {
+
+  // TODO(William Farner): Make this a required argument and ensure it is not included in production
+  //                       builds (MESOS-471).
+  //@NotNull
+  @CmdLine(name = "testing_log_file_path", help = "Path to a file to store local log file data in.")
+  private static final Arg<File> LOG_PATH = Arg.create(null);
+
+  @Override
+  protected void configure() {
+    Preconditions.checkNotNull(LOG_PATH.get());
+    bind(File.class).toInstance(LOG_PATH.get());
+    bind(Log.class).to(FileLog.class);
+    bind(FileLog.class).in(Singleton.class);
+    expose(Log.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/metadata/MetadataModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/metadata/MetadataModule.java b/src/main/java/org/apache/aurora/scheduler/metadata/MetadataModule.java
new file mode 100644
index 0000000..08377ae
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/metadata/MetadataModule.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.metadata;
+
+import javax.inject.Singleton;
+
+import com.google.inject.AbstractModule;
+
+import com.twitter.aurora.scheduler.events.PubsubEventModule;
+
+/**
+ * Binding module for scheduler metadata management.
+ */
+public class MetadataModule extends AbstractModule {
+
+  @Override
+  protected void configure() {
+    bind(NearestFit.class).in(Singleton.class);
+    PubsubEventModule.bindSubscriber(binder(), NearestFit.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java b/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java
new file mode 100644
index 0000000..47c1f5c
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.metadata;
+
+import java.util.Set;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Ticker;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.eventbus.Subscribe;
+
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import com.twitter.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import com.twitter.aurora.scheduler.events.PubsubEvent.TasksDeleted;
+import com.twitter.aurora.scheduler.events.PubsubEvent.Vetoed;
+import com.twitter.aurora.scheduler.filter.SchedulingFilter.Veto;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+
+/**
+ * Tracks vetoes against scheduling decisions and maintains the closest fit among all the vetoes
+ * for a task.
+ */
+public class NearestFit implements EventSubscriber {
+  @VisibleForTesting
+  static final Amount<Long, Time> EXPIRATION = Amount.of(10L, Time.MINUTES);
+
+  @VisibleForTesting
+  static final ImmutableSet<Veto> NO_VETO = ImmutableSet.of();
+
+  private final LoadingCache<String, Fit> fitByTask;
+
+  @VisibleForTesting
+  NearestFit(Ticker ticker) {
+    fitByTask = CacheBuilder.newBuilder()
+        .expireAfterWrite(EXPIRATION.getValue(), EXPIRATION.getUnit().getTimeUnit())
+        .ticker(ticker)
+        .build(new CacheLoader<String, Fit>() {
+          @Override public Fit load(String taskId) {
+            return new Fit();
+          }
+        });
+  }
+
+  @Inject
+  NearestFit() {
+    this(Ticker.systemTicker());
+  }
+
+  /**
+   * Gets the vetoes that represent the nearest fit for the given task.
+   *
+   * @param taskId The task to look up.
+   * @return The nearest fit vetoes for the given task.  This will return an empty set if
+   *         no vetoes have been recorded for the task.
+   */
+  public synchronized ImmutableSet<Veto> getNearestFit(String taskId) {
+    Fit fit = fitByTask.getIfPresent(taskId);
+    return (fit == null) ? NO_VETO : fit.vetoes;
+  }
+
+  /**
+   * Records a task deletion event.
+   *
+   * @param deletedEvent Task deleted event.
+   */
+  @Subscribe
+  public synchronized void remove(TasksDeleted deletedEvent) {
+    fitByTask.invalidateAll(Tasks.ids(deletedEvent.getTasks()));
+  }
+
+  /**
+   * Records a task state change event.
+   * This will ignore any events where the previous state is not {@link ScheduleStatus#PENDING}.
+   *
+   * @param stateChangeEvent Task state change.
+   */
+  @Subscribe
+  public synchronized void stateChanged(TaskStateChange stateChangeEvent) {
+    if (stateChangeEvent.getOldState() == ScheduleStatus.PENDING) {
+      fitByTask.invalidate(stateChangeEvent.getTaskId());
+    }
+  }
+
+  private static final Predicate<Veto> IS_CONSTRAINT_MISMATCH = new Predicate<Veto>() {
+    @Override public boolean apply(Veto veto) {
+      return veto.isConstraintMismatch();
+    }
+  };
+
+  /**
+   * Records a task veto event.
+   * This will ignore any veto events where any veto returns {@code true} from
+   * {@link Veto#isConstraintMismatch()}.
+   *
+   * @param vetoEvent Veto event.
+   */
+  @Subscribe
+  public synchronized void vetoed(Vetoed vetoEvent) {
+    Preconditions.checkNotNull(vetoEvent);
+    fitByTask.getUnchecked(vetoEvent.getTaskId()).maybeUpdate(vetoEvent.getVetoes());
+  }
+
+  private static class Fit {
+    private ImmutableSet<Veto> vetoes;
+
+    private static int score(Iterable<Veto> vetoes) {
+      int total = 0;
+      for (Veto veto : vetoes) {
+        total += veto.getScore();
+      }
+      return total;
+    }
+
+    private void update(Iterable<Veto> newVetoes) {
+      vetoes = ImmutableSet.copyOf(newVetoes);
+    }
+
+    /**
+     * Updates the nearest fit if the provided vetoes represents a closer fit than the current
+     * best fit.
+     * <p>
+     * There are two classes of vetoes: those with and without constraint mismatches. A set of
+     * vetoes without a constraint mismatch is always a better fit than a set with constraint
+     * mismatches.
+     * <p>
+     * If two sets are equivalent in that they do or do not have constraint mismatches, they are
+     * compared by the following criteria:
+     * <ul>
+     *   <li> the one with fewer vetoes is a better fit, irrespective of scores
+     *   <li> if the veto count is equal, the one with the smaller aggregate score is a better fit
+     * </ul>
+     *
+     * @param newVetoes The vetoes for the scheduling assignment with {@code newHost}.
+     */
+    void maybeUpdate(Set<Veto> newVetoes) {
+      if (vetoes == null) {
+        update(newVetoes);
+        return;
+      }
+
+      boolean valueMismatchOld = Iterables.any(vetoes, IS_CONSTRAINT_MISMATCH);
+      boolean valueMismatchNew = Iterables.any(newVetoes, IS_CONSTRAINT_MISMATCH);
+      if (valueMismatchOld == valueMismatchNew) {
+        if (newVetoes.size() < vetoes.size()) {
+          update(newVetoes);
+        } else if ((newVetoes.size() == vetoes.size()) && (score(newVetoes) < score(vetoes))) {
+          update(newVetoes);
+        }
+      } else if (valueMismatchOld) {
+        update(newVetoes);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/periodic/GcExecutorLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/periodic/GcExecutorLauncher.java b/src/main/java/org/apache/aurora/scheduler/periodic/GcExecutorLauncher.java
new file mode 100644
index 0000000..df15854
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/periodic/GcExecutorLauncher.java
@@ -0,0 +1,195 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.periodic;
+
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
+
+import org.apache.mesos.Protos.ExecutorID;
+import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.TaskStatus;
+
+import com.twitter.aurora.Protobufs;
+import com.twitter.aurora.codec.ThriftBinaryCodec;
+import com.twitter.aurora.codec.ThriftBinaryCodec.CodingException;
+import com.twitter.aurora.gen.comm.AdjustRetainedTasks;
+import com.twitter.aurora.scheduler.TaskLauncher;
+import com.twitter.aurora.scheduler.base.CommandUtil;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.configuration.Resources;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Data;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.util.Clock;
+import com.twitter.common.util.Random;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A task launcher that periodically initiates garbage collection on a host, re-using a single
+ * garbage collection executor
+ */
+public class GcExecutorLauncher implements TaskLauncher {
+  private static final Logger LOG = Logger.getLogger(GcExecutorLauncher.class.getName());
+
+  private final AtomicLong tasksCreated = Stats.exportLong("scheduler_gc_tasks_created");
+
+  @VisibleForTesting
+  static final Resources TOTAL_GC_EXECUTOR_RESOURCES =
+      new Resources(0.2, Amount.of(128L, Data.MB), Amount.of(16L, Data.MB), 0);
+
+  // An epsilon is used because we are required to supply executor and task resources.
+  @VisibleForTesting
+  static final Resources EPSILON =
+      new Resources(0.01, Amount.of(1L, Data.MB), Amount.of(1L, Data.MB), 0);
+
+  private static final Resources GC_EXECUTOR_RESOURCES =
+      Resources.subtract(TOTAL_GC_EXECUTOR_RESOURCES, EPSILON);
+
+  private static final String SYSTEM_TASK_PREFIX = "system-gc-";
+  private static final String EXECUTOR_NAME = "aurora.gc";
+
+  private final GcExecutorSettings settings;
+  private final Storage storage;
+  private final Clock clock;
+  private final Cache<String, Long> pulses;
+
+  @Inject
+  GcExecutorLauncher(
+      GcExecutorSettings settings,
+      Storage storage,
+      Clock clock) {
+
+    this.settings = checkNotNull(settings);
+    this.storage = checkNotNull(storage);
+    this.clock = checkNotNull(clock);
+
+    this.pulses = CacheBuilder.newBuilder()
+        .expireAfterWrite(settings.getMaxGcInterval(), TimeUnit.MILLISECONDS)
+        .build();
+  }
+
+  @Override
+  public Optional<TaskInfo> createTask(Offer offer) {
+    if (!settings.getGcExecutorPath().isPresent()
+        || !Resources.from(offer).greaterThanOrEqual(TOTAL_GC_EXECUTOR_RESOURCES)
+        || isAlive(offer.getHostname())) {
+      return Optional.absent();
+    }
+
+    Set<IScheduledTask> tasksOnHost =
+        Storage.Util.weaklyConsistentFetchTasks(storage, Query.slaveScoped(offer.getHostname()));
+    AdjustRetainedTasks message = new AdjustRetainedTasks()
+        .setRetainedTasks(Maps.transformValues(Tasks.mapById(tasksOnHost), Tasks.GET_STATUS));
+    byte[] data;
+    try {
+      data = ThriftBinaryCodec.encode(message);
+    } catch (CodingException e) {
+      LOG.severe("Failed to encode retained tasks message: " + message);
+      return Optional.absent();
+    }
+
+    tasksCreated.incrementAndGet();
+    pulses.put(offer.getHostname(), clock.nowMillis() + settings.getDelayMs());
+
+    ExecutorInfo.Builder executor = ExecutorInfo.newBuilder()
+        .setExecutorId(ExecutorID.newBuilder().setValue(EXECUTOR_NAME))
+        .setName(EXECUTOR_NAME)
+        .setSource(offer.getHostname())
+        .addAllResources(GC_EXECUTOR_RESOURCES.toResourceList())
+        .setCommand(CommandUtil.create(settings.getGcExecutorPath().get()));
+
+    return Optional.of(TaskInfo.newBuilder().setName("system-gc")
+        .setTaskId(TaskID.newBuilder().setValue(SYSTEM_TASK_PREFIX + UUID.randomUUID().toString()))
+        .setSlaveId(offer.getSlaveId())
+        .setData(ByteString.copyFrom(data))
+        .setExecutor(executor)
+        .addAllResources(EPSILON.toResourceList())
+        .build());
+  }
+
+  @Override
+  public boolean statusUpdate(TaskStatus status) {
+    if (status.getTaskId().getValue().startsWith(SYSTEM_TASK_PREFIX)) {
+      LOG.info("Received status update for GC task: " + Protobufs.toString(status));
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public void cancelOffer(OfferID offer) {
+    // No-op.
+  }
+
+  private boolean isAlive(String hostname) {
+    Optional<Long> timestamp = Optional.fromNullable(pulses.getIfPresent(hostname));
+    return timestamp.isPresent() && clock.nowMillis() < timestamp.get();
+  }
+
+  /**
+   * Wraps configuration values for the {@code GcExecutorLauncher}.
+   */
+  public static class GcExecutorSettings {
+    private final Amount<Long, Time> gcInterval;
+    private final Optional<String> gcExecutorPath;
+    private final Random rand = new Random.SystemRandom(new java.util.Random());
+
+    public GcExecutorSettings(
+        Amount<Long, Time> gcInterval,
+        Optional<String> gcExecutorPath) {
+
+      this.gcInterval = checkNotNull(gcInterval);
+      this.gcExecutorPath = checkNotNull(gcExecutorPath);
+    }
+
+    @VisibleForTesting
+    long getMaxGcInterval() {
+      return gcInterval.as(Time.MILLISECONDS);
+    }
+
+    @VisibleForTesting
+    int getDelayMs() {
+      return rand.nextInt(gcInterval.as(Time.MILLISECONDS).intValue());
+    }
+
+    @VisibleForTesting
+    Optional<String> getGcExecutorPath() {
+      return gcExecutorPath;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/quota/QuotaComparisonResult.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaComparisonResult.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaComparisonResult.java
new file mode 100644
index 0000000..2ba2db8
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaComparisonResult.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.quota;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.twitter.aurora.scheduler.storage.entities.IQuota;
+
+/**
+ * Calculates and formats detailed quota comparison result.
+ */
+class QuotaComparisonResult {
+
+  private enum Resource {
+    CPU("core(s)"),
+    RAM("MB"),
+    DISK("MB");
+
+    private final String unit;
+    private Resource(String unit) {
+      this.unit = unit;
+    }
+  }
+
+  enum Result {
+    SUFFICIENT_QUOTA,
+    INSUFFICIENT_QUOTA
+  }
+
+  private final String details;
+  private final Result result;
+
+  @VisibleForTesting
+  QuotaComparisonResult(Result result, String details) {
+    this.result = result;
+    this.details = details;
+  }
+
+  Result result() {
+    return result;
+  }
+
+  String details() {
+    return details;
+  }
+
+  static QuotaComparisonResult greaterOrEqual(IQuota a, IQuota b) {
+    StringBuilder details = new StringBuilder();
+    boolean result = compare(a.getNumCpus(), b.getNumCpus(), Resource.CPU, details)
+        & compare(a.getRamMb(), b.getRamMb(), Resource.RAM, details)
+        & compare(a.getDiskMb(), b.getDiskMb(), Resource.DISK, details);
+
+    return new QuotaComparisonResult(
+        result ? Result.SUFFICIENT_QUOTA : Result.INSUFFICIENT_QUOTA,
+        details.toString());
+  }
+
+  private static boolean compare(
+      double a,
+      double b,
+      Resource resource,
+      StringBuilder details) {
+
+    boolean result = a >= b;
+    if (!result) {
+      details
+          .append(details.length() > 0 ? "; " : "")
+          .append(resource)
+          .append(" quota exceeded by ")
+          .append(String.format("%.2f", b - a))
+          .append(" ")
+          .append(resource.unit);
+    }
+
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/quota/QuotaFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaFilter.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaFilter.java
new file mode 100644
index 0000000..cc67ba5
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaFilter.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.quota;
+
+import javax.inject.Inject;
+
+import com.google.common.collect.Iterables;
+
+import com.twitter.aurora.scheduler.base.JobKeys;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.quota.QuotaManager.QuotaManagerImpl;
+import com.twitter.aurora.scheduler.state.JobFilter;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
+import com.twitter.aurora.scheduler.storage.entities.IJobKey;
+import com.twitter.aurora.scheduler.storage.entities.IQuota;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.aurora.scheduler.quota.QuotaComparisonResult.Result.INSUFFICIENT_QUOTA;
+
+/**
+ * A filter that fails production jobs for roles that do not have sufficient quota to run them.
+ */
+class QuotaFilter implements JobFilter {
+  private final QuotaManagerImpl quotaManager;
+  private final Storage storage;
+
+  @Inject
+  QuotaFilter(QuotaManagerImpl quotaManager, Storage storage) {
+    this.quotaManager = checkNotNull(quotaManager);
+    this.storage = checkNotNull(storage);
+  }
+
+  @Override
+  public JobFilterResult filter(final IJobConfiguration job) {
+    return filterByTask(job.getKey(), job.getTaskConfig(), job.getInstanceCount());
+  }
+
+  @Override
+  public JobFilterResult filter(ITaskConfig template, int instanceCount) {
+    return filterByTask(JobKeys.from(template), template, instanceCount);
+  }
+
+  private synchronized JobFilterResult filterByTask(
+      IJobKey jobKey,
+      ITaskConfig template,
+      int instanceCount) {
+
+    if (!template.isProduction()) {
+      return JobFilterResult.pass();
+    }
+
+    IQuota currentUsage = Quotas.fromProductionTasks(
+        Iterables.transform(
+            Storage.Util.consistentFetchTasks(storage, Query.jobScoped(jobKey).active()),
+            Tasks.SCHEDULED_TO_INFO));
+
+    IQuota additionalRequested =
+        Quotas.subtract(Quotas.fromTasks(template, instanceCount), currentUsage);
+    QuotaComparisonResult comparisonResult =
+        quotaManager.checkQuota(jobKey.getRole(), additionalRequested);
+
+    if (comparisonResult.result() == INSUFFICIENT_QUOTA) {
+      return JobFilterResult.fail("Insufficient resource quota: " + comparisonResult.details());
+    }
+
+    return JobFilterResult.pass();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
new file mode 100644
index 0000000..a2abedb
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.quota;
+
+import com.google.common.collect.Iterables;
+import com.google.inject.Inject;
+
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.Work;
+import com.twitter.aurora.scheduler.storage.Storage.Work.Quiet;
+import com.twitter.aurora.scheduler.storage.entities.IQuota;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.common.base.MorePreconditions.checkNotBlank;
+
+/**
+ * Allows access to resource quotas, and tracks quota consumption.
+ */
+public interface QuotaManager {
+  /**
+   * Fetches the current resource usage for the role.
+   *
+   * @param role to fetch quota usage for.
+   * @return Resource quota used by {@code role}.
+   */
+  IQuota getConsumption(String role);
+
+  /**
+   * Quota provider that stores quotas in the canonical {@link Storage} system.
+   */
+  static class QuotaManagerImpl implements QuotaManager {
+    private final Storage storage;
+
+    @Inject
+    public QuotaManagerImpl(Storage storage) {
+      this.storage = checkNotNull(storage);
+    }
+
+    @Override
+    public IQuota getConsumption(final String role) {
+      checkNotBlank(role);
+
+      final Query.Builder query = Query.roleScoped(role).active();
+
+      return storage.consistentRead(
+          new Work.Quiet<IQuota>() {
+            @Override public IQuota apply(StoreProvider storeProvider) {
+              return Quotas.fromProductionTasks(Iterables.transform(
+                  storeProvider.getTaskStore().fetchTasks(query), Tasks.SCHEDULED_TO_INFO));
+            }
+          });
+    }
+
+    /**
+     * Tests whether the role has at least the specified amount of quota available.
+     *
+     * @param role Role to consume quota for.
+     * @param quota Quota amount to check for availability.
+     * @return QuotaComparisonResult with {@code result()} returning {@code true} if the role
+     * currently has at least {@code quota} quota remaining, {@code false} otherwise.
+     */
+    QuotaComparisonResult checkQuota(final String role, final IQuota quota) {
+      checkNotBlank(role);
+      checkNotNull(quota);
+
+      return storage.consistentRead(new Quiet<QuotaComparisonResult>() {
+        @Override public QuotaComparisonResult apply(StoreProvider storeProvider) {
+          IQuota reserved = storeProvider.getQuotaStore().fetchQuota(role).or(Quotas.noQuota());
+          return Quotas.greaterOrEqual(reserved, Quotas.add(getConsumption(role), quota));
+        }
+      });
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/quota/QuotaModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaModule.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaModule.java
new file mode 100644
index 0000000..63e8888
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaModule.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.quota;
+
+import javax.inject.Singleton;
+
+import com.google.inject.AbstractModule;
+
+import com.twitter.aurora.scheduler.quota.QuotaManager.QuotaManagerImpl;
+import com.twitter.aurora.scheduler.state.JobFilter;
+import com.twitter.aurora.scheduler.storage.Storage;
+
+/**
+ * Guice module for the quota package.
+ */
+public class QuotaModule extends AbstractModule {
+
+  @Override
+  protected void configure() {
+    requireBinding(Storage.class);
+
+    bind(QuotaManager.class).to(QuotaManagerImpl.class);
+    bind(QuotaManagerImpl.class).in(Singleton.class);
+
+    bind(JobFilter.class).to(QuotaFilter.class);
+    bind(QuotaFilter.class).in(Singleton.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/quota/Quotas.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/Quotas.java b/src/main/java/org/apache/aurora/scheduler/quota/Quotas.java
new file mode 100644
index 0000000..f7b6591
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/quota/Quotas.java
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.quota;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+
+import com.twitter.aurora.gen.Quota;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.storage.entities.IQuota;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Convenience class for normalizing resource measures between tasks and offers.
+ */
+public final class Quotas {
+  private static final IQuota NO_QUOTA = IQuota.build(new Quota(0, 0, 0));
+
+  private Quotas() {
+    // Utility class.
+  }
+
+  /**
+   * Returns a quota with all resource vectors zeroed.
+   *
+   * @return A quota with all resource vectors zeroed.
+   */
+  public static IQuota noQuota() {
+    return NO_QUOTA;
+  }
+
+  /**
+   * Determines the amount of quota required for a set of job tasks.
+   *
+   * @param taskConfig Task template to count quota from.
+   * @return Quota requirement to run {@code job}.
+   */
+  public static IQuota fromTasks(ITaskConfig taskConfig, int instanceCount) {
+    return scale(fromProductionTasks(ImmutableSet.of(taskConfig)), instanceCount);
+  }
+
+  // TODO(Suman Karumuri): Refactor this function in to a new class.
+  // TODO(Suman Karumuri): Rename Quota to something more meaningful (ex: ResourceAggregate)
+  /**
+   * Determines the amount of quota required for production tasks among {@code tasks}.
+   *
+   * @param tasks Tasks to count quota from.
+   * @return Quota requirement to run {@code tasks}.
+   */
+  public static IQuota fromProductionTasks(Iterable<ITaskConfig> tasks) {
+    checkNotNull(tasks);
+
+    return fromTasks(Iterables.filter(tasks, Tasks.IS_PRODUCTION));
+  }
+
+  /**
+   * Determines the amount of quota required for the given tasks.
+   *
+   * @param tasks Tasks to count quota from.
+   * @return Quota requirement to run {@code tasks}.
+   */
+  public static IQuota fromTasks(Iterable<ITaskConfig> tasks) {
+    double cpu = 0;
+    int ramMb = 0;
+    int diskMb = 0;
+    for (ITaskConfig task : tasks) {
+      cpu += task.getNumCpus();
+      ramMb += task.getRamMb();
+      diskMb += task.getDiskMb();
+    }
+
+    return IQuota.build(new Quota()
+        .setNumCpus(cpu)
+        .setRamMb(ramMb)
+        .setDiskMb(diskMb));
+  }
+
+  /**
+   * a >= b
+   */
+  public static QuotaComparisonResult greaterOrEqual(IQuota a, IQuota b) {
+    return QuotaComparisonResult.greaterOrEqual(a, b);
+  }
+
+  /**
+   * a + b
+   */
+  public static IQuota add(IQuota a, IQuota b) {
+    return IQuota.build(new Quota()
+        .setNumCpus(a.getNumCpus() + b.getNumCpus())
+        .setRamMb(a.getRamMb() + b.getRamMb())
+        .setDiskMb(a.getDiskMb() + b.getDiskMb()));
+  }
+
+  /**
+   * a - b
+   */
+  public static IQuota subtract(IQuota a, IQuota b) {
+    return IQuota.build(new Quota()
+        .setNumCpus(a.getNumCpus() - b.getNumCpus())
+        .setRamMb(a.getRamMb() - b.getRamMb())
+        .setDiskMb(a.getDiskMb() - b.getDiskMb()));
+  }
+
+  /**
+   * a * m
+   */
+  public static IQuota scale(IQuota a, int m) {
+    return IQuota.build(new Quota()
+        .setNumCpus(a.getNumCpus() * m)
+        .setRamMb(a.getRamMb() * m)
+        .setDiskMb(a.getDiskMb() * m));
+  }
+
+  /**
+   * a / b
+   * <p>
+   * This calculates how many times {@code b} "fits into" {@code a}.  Behavior is undefined when
+   * {@code b} contains resources with a value of zero.
+   */
+  public static int divide(IQuota a, IQuota b) {
+    return Ordering.natural().min(
+        a.getNumCpus() / b.getNumCpus(),
+        (double) a.getRamMb() / b.getRamMb(),
+        (double) a.getDiskMb() / b.getDiskMb()
+    ).intValue();
+  }
+
+  /**
+   * sum(qs)
+   */
+  public static IQuota sum(Iterable<IQuota> qs) {
+    IQuota sum = noQuota();
+    for (IQuota q : qs) {
+      sum = Quotas.add(sum, q);
+    }
+    return sum;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java b/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
new file mode 100644
index 0000000..d47c4af
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
@@ -0,0 +1,472 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.state;
+
+import java.util.Collections;
+import java.util.Date;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.twitter.aurora.gen.CronCollisionPolicy;
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.scheduler.base.JobKeys;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.ScheduleException;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
+import com.twitter.aurora.scheduler.configuration.SanitizedConfiguration;
+import com.twitter.aurora.scheduler.cron.CronException;
+import com.twitter.aurora.scheduler.cron.CronScheduler;
+import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import com.twitter.aurora.scheduler.events.PubsubEvent.StorageStarted;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
+import com.twitter.aurora.scheduler.storage.Storage.Work;
+import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
+import com.twitter.aurora.scheduler.storage.entities.IJobKey;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+import com.twitter.common.application.ShutdownRegistry;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.base.ExceptionalCommand;
+import com.twitter.common.base.Supplier;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.util.BackoffHelper;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.aurora.gen.ScheduleStatus.PENDING;
+
+/**
+ * A job scheduler that receives jobs that should be run periodically on a cron schedule.
+ */
+public class CronJobManager extends JobManager implements EventSubscriber {
+
+  public static final String MANAGER_KEY = "CRON";
+
+  @VisibleForTesting
+  static final String CRON_USER = "cron";
+
+  private static final Logger LOG = Logger.getLogger(CronJobManager.class.getName());
+
+  @CmdLine(name = "cron_start_initial_backoff", help =
+      "Initial backoff delay while waiting for a previous cron run to start.")
+  private static final Arg<Amount<Long, Time>> CRON_START_INITIAL_BACKOFF =
+      Arg.create(Amount.of(1L, Time.SECONDS));
+
+  @CmdLine(name = "cron_start_max_backoff", help =
+      "Max backoff delay while waiting for a previous cron run to start.")
+  private static final Arg<Amount<Long, Time>> CRON_START_MAX_BACKOFF =
+      Arg.create(Amount.of(1L, Time.MINUTES));
+
+  private final AtomicLong cronJobsTriggered = Stats.exportLong("cron_jobs_triggered");
+  private final AtomicLong cronJobLaunchFailures = Stats.exportLong("cron_job_launch_failures");
+
+  // Maps from the unique job identifier to the unique identifier used internally by the cron
+  // scheduler.
+  private final Map<IJobKey, String> scheduledJobs =
+      Collections.synchronizedMap(Maps.<IJobKey, String>newHashMap());
+
+  // Prevents runs from dogpiling while waiting for a run to transition out of the KILLING state.
+  // This is necessary because killing a job (if dictated by cron collision policy) is an
+  // asynchronous operation.
+  private final Map<IJobKey, SanitizedConfiguration> pendingRuns =
+      Collections.synchronizedMap(Maps.<IJobKey, SanitizedConfiguration>newHashMap());
+
+  private final StateManager stateManager;
+  private final Storage storage;
+  private final CronScheduler cron;
+  private final ShutdownRegistry shutdownRegistry;
+  private final BackoffHelper delayedStartBackoff;
+  private final Executor delayedRunExecutor;
+
+  @Inject
+  CronJobManager(
+      StateManager stateManager,
+      Storage storage,
+      CronScheduler cron,
+      ShutdownRegistry shutdownRegistry) {
+
+    this(
+        stateManager,
+        storage,
+        cron,
+        shutdownRegistry,
+        Executors.newCachedThreadPool(
+            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("CronDelay-%d").build()));
+  }
+
+  @VisibleForTesting
+  CronJobManager(
+      StateManager stateManager,
+      Storage storage,
+      CronScheduler cron,
+      ShutdownRegistry shutdownRegistry,
+      Executor delayedRunExecutor) {
+
+    this.stateManager = checkNotNull(stateManager);
+    this.storage = checkNotNull(storage);
+    this.cron = checkNotNull(cron);
+    this.shutdownRegistry = checkNotNull(shutdownRegistry);
+    this.delayedStartBackoff =
+        new BackoffHelper(CRON_START_INITIAL_BACKOFF.get(), CRON_START_MAX_BACKOFF.get());
+    this.delayedRunExecutor = checkNotNull(delayedRunExecutor);
+
+    Stats.exportSize("cron_num_pending_runs", pendingRuns);
+  }
+
+  private void mapScheduledJob(IJobConfiguration job, String scheduledJobKey) {
+    IJobKey jobKey = job.getKey();
+    synchronized (scheduledJobs) {
+      Preconditions.checkState(
+          !scheduledJobs.containsKey(jobKey),
+          "Illegal state - cron schedule already exists for " + JobKeys.toPath(jobKey));
+      scheduledJobs.put(jobKey, scheduledJobKey);
+    }
+  }
+
+  /**
+   * Notifies the cron job manager that storage is started, and job configurations are ready to
+   * load.
+   *
+   * @param storageStarted Event.
+   */
+  @Subscribe
+  public void storageStarted(StorageStarted storageStarted) {
+    cron.start();
+    shutdownRegistry.addAction(new ExceptionalCommand<CronException>() {
+      @Override public void execute() throws CronException {
+        cron.stop();
+      }
+    });
+
+    Iterable<IJobConfiguration> crons =
+        storage.consistentRead(new Work.Quiet<Iterable<IJobConfiguration>>() {
+          @Override public Iterable<IJobConfiguration> apply(Storage.StoreProvider storeProvider) {
+            return storeProvider.getJobStore().fetchJobs(MANAGER_KEY);
+          }
+        });
+
+    for (IJobConfiguration job : crons) {
+      try {
+        mapScheduledJob(job, scheduleJob(SanitizedConfiguration.fromUnsanitized(job)));
+      } catch (ScheduleException | TaskDescriptionException e) {
+        logLaunchFailure(job, e);
+      }
+    }
+  }
+
+  private void logLaunchFailure(IJobConfiguration job, Exception e) {
+    cronJobLaunchFailures.incrementAndGet();
+    LOG.log(Level.SEVERE, "Scheduling failed for recovered job " + job, e);
+  }
+
+  /**
+   * Triggers execution of a job.
+   *
+   * @param jobKey Key of the job to start.
+   */
+  public void startJobNow(IJobKey jobKey) throws TaskDescriptionException {
+    checkNotNull(jobKey);
+
+    Optional<IJobConfiguration> jobConfig = fetchJob(jobKey);
+    checkArgument(jobConfig.isPresent(), "No such cron job " + JobKeys.toPath(jobKey));
+
+    cronTriggered(SanitizedConfiguration.fromUnsanitized(jobConfig.get()));
+  }
+
+  private void delayedRun(final Query.Builder query, final SanitizedConfiguration config) {
+    IJobConfiguration job = config.getJobConfig();
+    final String jobPath = JobKeys.toPath(job);
+    final IJobKey jobKey = job.getKey();
+    LOG.info("Waiting for job to terminate before launching cron job " + jobPath);
+    if (pendingRuns.put(jobKey, config) == null) {
+      LOG.info("Launching a task to wait for job to finish: " + jobPath);
+      // There was no run already pending for this job, launch a task to delay launch until the
+      // existing run has terminated.
+      delayedRunExecutor.execute(new Runnable() {
+        @Override public void run() {
+          runWhenTerminated(query, jobKey);
+        }
+      });
+    }
+  }
+
+  private void runWhenTerminated(final Query.Builder query, final IJobKey jobKey) {
+    try {
+      delayedStartBackoff.doUntilSuccess(new Supplier<Boolean>() {
+        @Override public Boolean get() {
+          if (!hasTasks(query)) {
+            LOG.info("Initiating delayed launch of cron " + jobKey);
+            SanitizedConfiguration config = pendingRuns.remove(jobKey);
+            checkNotNull(config, "Failed to fetch job for delayed run of " + jobKey);
+            LOG.info("Launching " + config.getTaskConfigs().size() + " tasks.");
+            stateManager.insertPendingTasks(config.getTaskConfigs());
+            return true;
+          } else {
+            LOG.info("Not yet safe to run cron " + jobKey);
+            return false;
+          }
+        }
+      });
+    } catch (InterruptedException e) {
+      LOG.log(Level.WARNING, "Interrupted while trying to launch cron " + jobKey, e);
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private boolean hasTasks(Query.Builder query) {
+    return !Storage.Util.consistentFetchTasks(storage, query).isEmpty();
+  }
+
+  public static CronCollisionPolicy orDefault(@Nullable CronCollisionPolicy policy) {
+    return Optional.fromNullable(policy).or(CronCollisionPolicy.KILL_EXISTING);
+  }
+
+  /**
+   * Triggers execution of a cron job, depending on the cron collision policy for the job.
+   *
+   * @param config The config of the job to be triggered.
+   */
+  @VisibleForTesting
+  void cronTriggered(SanitizedConfiguration config) {
+    IJobConfiguration job = config.getJobConfig();
+    LOG.info(String.format("Cron triggered for %s at %s with policy %s",
+        JobKeys.toPath(job), new Date(), job.getCronCollisionPolicy()));
+    cronJobsTriggered.incrementAndGet();
+
+    ImmutableMap.Builder<Integer, ITaskConfig> builder = ImmutableMap.builder();
+    final Query.Builder activeQuery = Query.jobScoped(job.getKey()).active();
+    Set<IScheduledTask> activeTasks = Storage.Util.consistentFetchTasks(storage, activeQuery);
+
+    if (activeTasks.isEmpty()) {
+      builder.putAll(config.getTaskConfigs());
+    } else {
+      // Assign a default collision policy.
+      CronCollisionPolicy collisionPolicy = orDefault(job.getCronCollisionPolicy());
+
+      switch (collisionPolicy) {
+        case KILL_EXISTING:
+          try {
+            schedulerCore.killTasks(activeQuery, CRON_USER);
+            // Check immediately if the tasks are gone.  This could happen if the existing tasks
+            // were pending.
+            if (!hasTasks(activeQuery)) {
+              builder.putAll(config.getTaskConfigs());
+            } else {
+              delayedRun(activeQuery, config);
+            }
+          } catch (ScheduleException e) {
+            LOG.log(Level.SEVERE, "Failed to kill job.", e);
+          }
+          break;
+
+        case CANCEL_NEW:
+          break;
+
+        case RUN_OVERLAP:
+          Map<Integer, IScheduledTask> byInstance =
+              Maps.uniqueIndex(activeTasks, Tasks.SCHEDULED_TO_INSTANCE_ID);
+          Map<Integer, ScheduleStatus> existingTasks =
+              Maps.transformValues(byInstance, Tasks.GET_STATUS);
+          if (existingTasks.isEmpty()) {
+            builder.putAll(config.getTaskConfigs());
+          } else if (Iterables.any(existingTasks.values(), Predicates.equalTo(PENDING))) {
+            LOG.info("Job " + JobKeys.toPath(job) + " has pending tasks, suppressing run.");
+          } else {
+            // To safely overlap this run, we need to adjust the instance IDs of the overlapping
+            // run (maintaining the role/job/instance UUID invariant).
+            int instanceOffset = Ordering.natural().max(existingTasks.keySet()) + 1;
+            LOG.info("Adjusting instance IDs of " + JobKeys.toPath(job) + " by " + instanceOffset
+                + " for overlapping cron run.");
+            for (Map.Entry<Integer, ITaskConfig> entry : config.getTaskConfigs().entrySet()) {
+              builder.put(entry.getKey() + instanceOffset, entry.getValue());
+            }
+          }
+          break;
+
+        default:
+          LOG.severe("Unrecognized cron collision policy: " + job.getCronCollisionPolicy());
+      }
+    }
+
+    Map<Integer, ITaskConfig> newTasks = builder.build();
+    if (!newTasks.isEmpty()) {
+      stateManager.insertPendingTasks(newTasks);
+    }
+  }
+
+  /**
+   * Updates (re-schedules) the existing cron job.
+   *
+   * @param config New job configuration to update to.
+   * @throws ScheduleException If non-cron job confuration provided.
+   */
+  public void updateJob(SanitizedConfiguration config) throws ScheduleException {
+    IJobConfiguration job = config.getJobConfig();
+    if (!hasCronSchedule(job)) {
+      throw new ScheduleException("A cron job may not be updated to a non-cron job.");
+    }
+    String key = scheduledJobs.remove(job.getKey());
+    checkNotNull(key, "Attempted to update unknown job " + JobKeys.toPath(job));
+    cron.deschedule(key);
+    checkArgument(receiveJob(config));
+  }
+
+  @Override
+  public String getUniqueKey() {
+    return MANAGER_KEY;
+  }
+
+  private static boolean hasCronSchedule(IJobConfiguration job) {
+    checkNotNull(job);
+    return !StringUtils.isEmpty(job.getCronSchedule());
+  }
+
+  @Override
+  public boolean receiveJob(SanitizedConfiguration config) throws ScheduleException {
+    final IJobConfiguration job = config.getJobConfig();
+    if (!hasCronSchedule(job)) {
+      return false;
+    }
+
+    String scheduledJobKey = scheduleJob(config);
+    storage.write(new MutateWork.NoResult.Quiet() {
+      @Override protected void execute(Storage.MutableStoreProvider storeProvider) {
+        storeProvider.getJobStore().saveAcceptedJob(MANAGER_KEY, job);
+      }
+    });
+    mapScheduledJob(job, scheduledJobKey);
+
+    return true;
+  }
+
+  private String scheduleJob(final SanitizedConfiguration config) throws ScheduleException {
+    final IJobConfiguration job = config.getJobConfig();
+    final String jobPath = JobKeys.toPath(job);
+    if (!hasCronSchedule(job)) {
+      throw new ScheduleException(
+          String.format("Not a valid cronjob, %s has no cron schedule", jobPath));
+    }
+
+    if (!cron.isValidSchedule(job.getCronSchedule())) {
+      throw new ScheduleException("Invalid cron schedule: " + job.getCronSchedule());
+    }
+
+    LOG.info(String.format("Scheduling cron job %s: %s", jobPath, job.getCronSchedule()));
+    try {
+      return cron.schedule(job.getCronSchedule(), new Runnable() {
+        @Override public void run() {
+          // TODO(William Farner): May want to record information about job runs.
+          LOG.info("Running cron job: " + jobPath);
+          cronTriggered(config);
+        }
+      });
+    } catch (CronException e) {
+      throw new ScheduleException("Failed to schedule cron job: " + e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public Iterable<IJobConfiguration> getJobs() {
+    return storage.consistentRead(new Work.Quiet<Iterable<IJobConfiguration>>() {
+      @Override
+      public Iterable<IJobConfiguration> apply(Storage.StoreProvider storeProvider) {
+        return storeProvider.getJobStore().fetchJobs(MANAGER_KEY);
+      }
+    });
+  }
+
+  @Override
+  public boolean hasJob(IJobKey jobKey) {
+    return fetchJob(jobKey).isPresent();
+  }
+
+  private Optional<IJobConfiguration> fetchJob(final IJobKey jobKey) {
+    checkNotNull(jobKey);
+    return storage.consistentRead(new Work.Quiet<Optional<IJobConfiguration>>() {
+      @Override public Optional<IJobConfiguration> apply(Storage.StoreProvider storeProvider) {
+        return storeProvider.getJobStore().fetchJob(MANAGER_KEY, jobKey);
+      }
+    });
+  }
+
+  @Override
+  public boolean deleteJob(final IJobKey jobKey) {
+    checkNotNull(jobKey);
+
+    if (!hasJob(jobKey)) {
+      return false;
+    }
+
+    String scheduledJobKey = scheduledJobs.remove(jobKey);
+    if (scheduledJobKey != null) {
+      cron.deschedule(scheduledJobKey);
+      storage.write(new MutateWork.NoResult.Quiet() {
+        @Override protected void execute(Storage.MutableStoreProvider storeProvider) {
+          storeProvider.getJobStore().removeJob(jobKey);
+        }
+      });
+      LOG.info("Successfully deleted cron job " + jobKey);
+    }
+    return true;
+  }
+
+  private final Function<String, String> keyToSchedule = new Function<String, String>() {
+    @Override public String apply(String key) {
+      return cron.getSchedule(key).or("Not found.");
+    }
+  };
+
+  public Map<IJobKey, String> getScheduledJobs() {
+    synchronized (scheduledJobs) {
+      return ImmutableMap.copyOf(Maps.transformValues(scheduledJobs, keyToSchedule));
+    }
+  }
+
+  public Set<IJobKey> getPendingRuns() {
+    synchronized (pendingRuns) {
+      return ImmutableSet.copyOf(pendingRuns.keySet());
+    }
+  }
+}


Mime
View raw message