aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [2/2] aurora git commit: Add RemoveJobUpdates log Op, slim JobUpdateStore API
Date Wed, 29 Nov 2017 00:59:13 GMT
Add RemoveJobUpdates log Op, slim JobUpdateStore API

JobUpdateStore historically had granular APIs in the storage layer to
minimize unnecessary use of 'expensive' database objects.  The
in-memory store makes these 'free', so moving business logic out of
the storage layer is now feasible for performance and pragmatic.

This patch also introduces the `RemoveJobUpdates` log `Op`, and
`PruneJobUpdateHistory` is now ignored.  In a future release (and possibly
before, with a feature flag), the scheduler will write `RemoveJobUpdates`
to the log.

LogStorage has always had the fundamental expectation that `Op`s are
idempotent.  The job update event `Op`s arguably violate this requirement, but
at minimum, explicit removal of updates is necessary for idempotency.

>From LogStorage.java:

    This design implies that all mutations must be idempotent and free from
    constraint and thus replayable over newer operations when recovering
    from an old checkpoint.

Reviewed at https://reviews.apache.org/r/63884/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/284f40f5
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/284f40f5
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/284f40f5

Branch: refs/heads/master
Commit: 284f40f5e36c70114e6229fcb93e3b203d2f1120
Parents: 80139da
Author: Bill Farner <wfarner@apache.org>
Authored: Tue Nov 28 16:58:47 2017 -0800
Committer: Bill Farner <wfarner@apache.org>
Committed: Tue Nov 28 16:58:47 2017 -0800

----------------------------------------------------------------------
 .../thrift/org/apache/aurora/gen/storage.thrift |   5 +
 .../org/apache/aurora/benchmark/JobUpdates.java |   2 +-
 .../aurora/benchmark/UpdateStoreBenchmarks.java |  12 +-
 .../pruning/JobUpdateHistoryPruner.java         |  94 ++-
 .../aurora/scheduler/pruning/PruningModule.java |  11 +-
 .../scheduler/pruning/TaskHistoryPruner.java    |   8 +-
 .../aurora/scheduler/quota/QuotaManager.java    |  30 +-
 .../scheduler/storage/JobUpdateStore.java       |  83 +--
 .../scheduler/storage/log/LogStorage.java       |  10 +-
 .../storage/log/SnapshotStoreImpl.java          |   4 +-
 .../storage/log/WriteAheadStorage.java          |  55 +-
 .../storage/mem/MemJobUpdateStore.java          | 158 +----
 .../scheduler/thrift/ReadOnlySchedulerImpl.java |  21 +-
 .../updater/JobUpdateControllerImpl.java        |  84 ++-
 .../pruning/JobUpdateHistoryPrunerTest.java     | 170 +++--
 .../pruning/TaskHistoryPrunerTest.java          |   4 +-
 .../scheduler/quota/QuotaManagerImplTest.java   | 131 ++--
 .../storage/AbstractJobUpdateStoreTest.java     | 711 +++++--------------
 .../scheduler/storage/log/LogStorageTest.java   |  30 +-
 .../storage/log/WriteAheadStorageTest.java      |  28 +-
 .../thrift/ReadOnlySchedulerImplTest.java       |  23 +-
 .../aurora/scheduler/updater/JobUpdaterIT.java  |   8 +-
 22 files changed, 623 insertions(+), 1059 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
----------------------------------------------------------------------
diff --git a/api/src/main/thrift/org/apache/aurora/gen/storage.thrift b/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
index c692a5f..2210497 100644
--- a/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
+++ b/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
@@ -58,6 +58,10 @@ struct SaveJobUpdate {
   // 2: deleted
 }
 
+struct RemoveJobUpdates {
+  1: set<api.JobUpdateKey> keys
+}
+
 struct StoredJobUpdateDetails {
   1: api.JobUpdateDetails details
   // 2: deleted
@@ -94,6 +98,7 @@ union Op {
   15: SaveJobUpdateEvent saveJobUpdateEvent
   16: SaveJobInstanceUpdateEvent saveJobInstanceUpdateEvent
   17: PruneJobUpdateHistory pruneJobUpdateHistory
+  18: RemoveJobUpdates removeJobUpdate
 }
 
 // The current schema version ID.  This should be incremented each time the

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java b/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java
index a5d1894..7557301 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java
@@ -63,7 +63,7 @@ final class JobUpdates {
     ImmutableSet.Builder<IJobUpdateKey> keyBuilder = ImmutableSet.builder();
     storage.write((Storage.MutateWork.NoResult.Quiet) store -> {
       JobUpdateStore.Mutable updateStore = store.getJobUpdateStore();
-      updateStore.deleteAllUpdatesAndEvents();
+      updateStore.deleteAllUpdates();
       for (IJobUpdateDetails details : updates) {
         IJobUpdateKey key = details.getUpdate().getSummary().getKey();
         keyBuilder.add(key);

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
index c98c514..e41db20 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
@@ -67,13 +67,13 @@ public class UpdateStoreBenchmarks {
     @TearDown(Level.Iteration)
     public void tearDownIteration() {
       storage.write((NoResult.Quiet) storeProvider -> {
-        storeProvider.getJobUpdateStore().deleteAllUpdatesAndEvents();
+        storeProvider.getJobUpdateStore().deleteAllUpdates();
       });
     }
 
     @Benchmark
     public IJobUpdateDetails run() throws TException {
-      return storage.read(store -> store.getJobUpdateStore().fetchJobUpdateDetails(
+      return storage.read(store -> store.getJobUpdateStore().fetchJobUpdate(
           Iterables.getOnlyElement(keys)).get());
     }
   }
@@ -106,13 +106,13 @@ public class UpdateStoreBenchmarks {
     @TearDown(Level.Iteration)
     public void tearDownIteration() {
       storage.write((NoResult.Quiet) storeProvider -> {
-        storeProvider.getJobUpdateStore().deleteAllUpdatesAndEvents();
+        storeProvider.getJobUpdateStore().deleteAllUpdates();
       });
     }
 
     @Benchmark
     public IJobUpdateDetails run() throws TException {
-      return storage.read(store -> store.getJobUpdateStore().fetchJobUpdateDetails(
+      return storage.read(store -> store.getJobUpdateStore().fetchJobUpdate(
           Iterables.getOnlyElement(keys)).get());
     }
   }
@@ -145,13 +145,13 @@ public class UpdateStoreBenchmarks {
     @TearDown(Level.Iteration)
     public void tearDownIteration() {
       storage.write((NoResult.Quiet) storeProvider -> {
-        storeProvider.getJobUpdateStore().deleteAllUpdatesAndEvents();
+        storeProvider.getJobUpdateStore().deleteAllUpdates();
       });
     }
 
     @Benchmark
     public IJobUpdateDetails run() throws TException {
-      return storage.read(store -> store.getJobUpdateStore().fetchJobUpdateDetails(
+      return storage.read(store -> store.getJobUpdateStore().fetchJobUpdate(
           Iterables.getOnlyElement(keys)).get());
     }
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java
index b2768d9..05ada3c 100644
--- a/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java
@@ -13,39 +13,51 @@
  */
 package org.apache.aurora.scheduler.pruning;
 
+import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
-import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Ordering;
+import com.google.common.util.concurrent.AbstractScheduledService;
 
+import org.apache.aurora.common.inject.TimedInterceptor.Timed;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.common.util.Clock;
+import org.apache.aurora.gen.JobUpdateQuery;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static java.util.Objects.requireNonNull;
 
+import static org.apache.aurora.scheduler.storage.JobUpdateStore.TERMINAL_STATES;
+
 /**
  * Prunes per-job update history on a periodic basis.
  */
-class JobUpdateHistoryPruner extends AbstractIdleService {
+class JobUpdateHistoryPruner extends AbstractScheduledService {
   private static final Logger LOG = LoggerFactory.getLogger(JobUpdateHistoryPruner.class);
   @VisibleForTesting
   static final String JOB_UPDATES_PRUNED = "job_updates_pruned";
 
   private final Clock clock;
-  private final ScheduledExecutorService executor;
   private final Storage storage;
   private final HistoryPrunerSettings settings;
   private final AtomicLong prunedUpdatesCount;
@@ -69,38 +81,80 @@ class JobUpdateHistoryPruner extends AbstractIdleService {
   @Inject
   JobUpdateHistoryPruner(
       Clock clock,
-      ScheduledExecutorService executor,
       Storage storage,
       HistoryPrunerSettings settings,
       StatsProvider statsProvider) {
 
     this.clock = requireNonNull(clock);
-    this.executor = requireNonNull(executor);
     this.storage = requireNonNull(storage);
     this.settings = requireNonNull(settings);
     this.prunedUpdatesCount = statsProvider.makeCounter(JOB_UPDATES_PRUNED);
   }
 
   @Override
-  protected void startUp() {
-    executor.scheduleAtFixedRate(
-        () -> storage.write((NoResult.Quiet) storeProvider -> {
-          Set<IJobUpdateKey> prunedUpdates = storeProvider.getJobUpdateStore().pruneHistory(
-              settings.maxUpdatesPerJob,
-              clock.nowMillis() - settings.maxHistorySize.as(Time.MILLISECONDS));
-
-          prunedUpdatesCount.addAndGet(prunedUpdates.size());
-          LOG.info(prunedUpdates.isEmpty()
-              ? "No job update history to prune."
-              : "Pruned job update history: " + Joiner.on(",").join(prunedUpdates));
-        }),
+  protected Scheduler scheduler() {
+    return Scheduler.newFixedDelaySchedule(
         settings.pruneInterval.as(Time.MILLISECONDS),
         settings.pruneInterval.as(Time.MILLISECONDS),
         TimeUnit.MILLISECONDS);
   }
 
+  @VisibleForTesting
+  void runForTest() {
+    runOneIteration();
+  }
+
+  @Timed("job_update_store_prune_history")
   @Override
-  protected void shutDown() {
-    // Nothing to do - await VM shutdown.
+  protected void runOneIteration() {
+    storage.write((NoResult.Quiet) storeProvider -> {
+
+      List<IJobUpdateSummary> completedUpdates = storeProvider.getJobUpdateStore()
+          .fetchJobUpdates(IJobUpdateQuery.build(
+              new JobUpdateQuery().setUpdateStatuses(TERMINAL_STATES)))
+          .stream()
+          .map(u -> u.getUpdate().getSummary())
+          .collect(Collectors.toList());
+
+      long cutoff = clock.nowMillis() - settings.maxHistorySize.as(Time.MILLISECONDS);
+      Predicate<IJobUpdateSummary> expiredFilter =
+          s -> s.getState().getCreatedTimestampMs() < cutoff;
+
+      ImmutableSet.Builder<IJobUpdateKey> pruneBuilder = ImmutableSet.builder();
+
+      // Gather updates based on time threshold.
+      pruneBuilder.addAll(completedUpdates
+          .stream()
+          .filter(expiredFilter)
+          .map(IJobUpdateSummary::getKey)
+          .collect(Collectors.toList()));
+
+      Multimap<IJobKey, IJobUpdateSummary> updatesByJob = Multimaps.index(
+          // Avoid counting to-be-removed expired updates.
+          completedUpdates.stream().filter(expiredFilter.negate()).iterator(),
+          s -> s.getKey().getJob());
+
+      updatesByJob.asMap().values().forEach(updates -> {
+        if (updates.size() > settings.maxUpdatesPerJob) {
+          Ordering<IJobUpdateSummary> creationOrder = Ordering.natural()
+              .onResultOf(s -> s.getState().getCreatedTimestampMs());
+          pruneBuilder.addAll(creationOrder
+              .leastOf(updates, updates.size() - settings.maxUpdatesPerJob)
+              .stream()
+              .map(IJobUpdateSummary::getKey)
+              .iterator());
+        }
+      });
+
+      Set<IJobUpdateKey> pruned = pruneBuilder.build();
+      if (!pruned.isEmpty()) {
+        storeProvider.getJobUpdateStore().removeJobUpdates(pruned);
+      }
+
+      prunedUpdatesCount.addAndGet(pruned.size());
+      LOG.info(pruned.isEmpty()
+          ? "No job update history to prune."
+          : "Pruned job update history: " + Joiner.on(",").join(pruned));
+    });
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java b/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java
index 4433b96..0ed22d9 100644
--- a/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java
@@ -27,7 +27,6 @@ import org.apache.aurora.scheduler.SchedulerServicesModule;
 import org.apache.aurora.scheduler.base.AsyncUtil;
 import org.apache.aurora.scheduler.config.types.TimeAmount;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
-import org.apache.aurora.scheduler.pruning.TaskHistoryPruner.HistoryPrunnerSettings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,11 +78,11 @@ public class PruningModule extends AbstractModule {
       protected void configure() {
         // TODO(ksweeney): Create a configuration validator module so this can be injected.
         // TODO(William Farner): Revert this once large task counts is cheap ala hierarchichal store
-        bind(HistoryPrunnerSettings.class).toInstance(new HistoryPrunnerSettings(
-            options.historyPruneThreshold,
-            options.historyMinRetentionThreshold,
-            options.historyMaxPerJobThreshold
-        ));
+        bind(TaskHistoryPruner.HistoryPrunerSettings.class).toInstance(
+            new TaskHistoryPruner.HistoryPrunerSettings(
+                options.historyPruneThreshold,
+                options.historyMinRetentionThreshold,
+                options.historyMaxPerJobThreshold));
 
         bind(TaskHistoryPruner.class).in(Singleton.class);
         expose(TaskHistoryPruner.class);

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
index 3cafbc2..9aa51c3 100644
--- a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
@@ -65,7 +65,7 @@ public class TaskHistoryPruner implements EventSubscriber {
   private final ScheduledExecutorService executor;
   private final StateManager stateManager;
   private final Clock clock;
-  private final HistoryPrunnerSettings settings;
+  private final HistoryPrunerSettings settings;
   private final Storage storage;
   private final Lifecycle lifecycle;
   private final TaskEventBatchWorker batchWorker;
@@ -79,12 +79,12 @@ public class TaskHistoryPruner implements EventSubscriber {
     }
   };
 
-  static class HistoryPrunnerSettings {
+  static class HistoryPrunerSettings {
     private final long pruneThresholdMillis;
     private final long minRetentionThresholdMillis;
     private final int perJobHistoryGoal;
 
-    HistoryPrunnerSettings(
+    HistoryPrunerSettings(
         Amount<Long, Time> inactivePruneThreshold,
         Amount<Long, Time> minRetentionThreshold,
         int perJobHistoryGoal) {
@@ -100,7 +100,7 @@ public class TaskHistoryPruner implements EventSubscriber {
       @AsyncExecutor ScheduledExecutorService executor,
       StateManager stateManager,
       Clock clock,
-      HistoryPrunnerSettings settings,
+      HistoryPrunerSettings settings,
       Storage storage,
       Lifecycle lifecycle,
       TaskEventBatchWorker batchWorker,

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
index 7f8c66c..64ad12b 100644
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
@@ -16,6 +16,7 @@ package org.apache.aurora.scheduler.quota;
 import java.util.EnumSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -26,7 +27,6 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.RangeSet;
 
@@ -38,7 +38,6 @@ import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.resources.ResourceType;
-import org.apache.aurora.scheduler.storage.JobUpdateStore;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
@@ -48,7 +47,6 @@ import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
 import org.apache.aurora.scheduler.storage.entities.IRange;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -274,8 +272,13 @@ public interface QuotaManager {
           .from(storeProvider.getTaskStore().fetchTasks(Query.roleScoped(role).active()))
           .transform(IScheduledTask::getAssignedTask);
 
-      Map<IJobKey, IJobUpdateInstructions> updates = Maps.newHashMap(
-          fetchActiveJobUpdates(storeProvider.getJobUpdateStore(), role));
+      // Relies on the invariant of at-most-one active update per job.
+      Map<IJobKey, IJobUpdateInstructions> updates = storeProvider.getJobUpdateStore()
+          .fetchJobUpdates(updateQuery(role))
+          .stream()
+          .collect(Collectors.toMap(
+              u -> u.getUpdate().getSummary().getKey().getJob(),
+              u -> u.getUpdate().getInstructions()));
 
       // Mix in a requested job update (if present) to correctly calculate consumption.
       // This would be an update that is not saved in the store yet (i.e. the one quota is
@@ -398,20 +401,6 @@ public interface QuotaManager {
       };
     }
 
-    private static Map<IJobKey, IJobUpdateInstructions> fetchActiveJobUpdates(
-        final JobUpdateStore jobUpdateStore,
-        String role) {
-
-      Function<IJobUpdateSummary, IJobUpdate> fetchUpdate =
-          summary -> jobUpdateStore.fetchJobUpdate(summary.getKey()).get();
-
-      return Maps.transformValues(
-          FluentIterable.from(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(role)))
-              .transform(fetchUpdate)
-              .uniqueIndex(UPDATE_TO_JOB_KEY),
-          IJobUpdate::getInstructions);
-    }
-
     @VisibleForTesting
     static IJobUpdateQuery updateQuery(String role) {
       return IJobUpdateQuery.build(new JobUpdateQuery()
@@ -474,9 +463,6 @@ public interface QuotaManager {
       return addAll(Iterables.transform(tasks, QUOTA_RESOURCES));
     }
 
-    private static final Function<IJobUpdate, IJobKey> UPDATE_TO_JOB_KEY =
-        input -> input.getSummary().getKey().getJob();
-
     private static int getUpdateInstanceCount(Set<IRange> ranges) {
       int instanceCount = 0;
       for (IRange range : ranges) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
index b3d906b..6b91d97 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
@@ -19,15 +19,14 @@ import java.util.Set;
 
 import com.google.common.base.Optional;
 
+import org.apache.aurora.gen.JobUpdateQuery;
 import org.apache.aurora.gen.JobUpdateStatus;
 import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
 
 import static org.apache.aurora.gen.JobUpdateStatus.ABORTED;
 import static org.apache.aurora.gen.JobUpdateStatus.ERROR;
@@ -48,21 +47,15 @@ public interface JobUpdateStore {
       ERROR
   );
 
-  /**
-   * Fetches a read-only view of job update summaries.
-   *
-   * @param query Query to identify job update summaries with.
-   * @return A read-only view of job update summaries.
-   */
-  List<IJobUpdateSummary> fetchJobUpdateSummaries(IJobUpdateQuery query);
+  IJobUpdateQuery MATCH_ALL = IJobUpdateQuery.build(new JobUpdateQuery());
 
   /**
    * Fetches a read-only view of job update details matching the {@code query}.
    *
    * @param query Query to identify job update details with.
-   * @return A read-only list view of job update details matching the query.
+   * @return A read-only view of job update details matching the query.
    */
-  List<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateQuery query);
+  List<IJobUpdateDetails> fetchJobUpdates(IJobUpdateQuery query);
 
   /**
    * Fetches a read-only view of job update details.
@@ -70,57 +63,12 @@ public interface JobUpdateStore {
    * @param key Update identifier.
    * @return A read-only view of job update details.
    */
-  Optional<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateKey key);
-
-  /**
-   * Fetches a read-only view of a job update.
-   *
-   * @param key Update identifier.
-   * @return A read-only view of job update.
-   */
-  Optional<IJobUpdate> fetchJobUpdate(IJobUpdateKey key);
-
-  /**
-   * Fetches a read-only view of the instructions for a job update.
-   *
-   * @param key Update identifier.
-   * @return A read-only view of job update instructions.
-   */
-  Optional<IJobUpdateInstructions> fetchJobUpdateInstructions(IJobUpdateKey key);
-
-  /**
-   * Fetches a read-only view of all job update details available in the store.
-   * TODO(wfarner): Generate immutable wrappers for storage.thrift structs, use an immutable object
-   *                here.
-   *
-   * @return A read-only view of all job update details.
-   */
-  Set<IJobUpdateDetails> fetchAllJobUpdateDetails();
-  /**
-   * Fetches the events that have affected an instance within a job update.
-   *
-   * @param key Update identifier.
-   * @param instanceId Instance to fetch events for.
-   * @return Instance events in {@code key} that affected {@code instanceId}.
-   */
-  List<IJobInstanceUpdateEvent> fetchInstanceEvents(IJobUpdateKey key, int instanceId);
+  Optional<IJobUpdateDetails> fetchJobUpdate(IJobUpdateKey key);
 
   interface Mutable extends JobUpdateStore {
 
     /**
-     * Saves a new job update.
-     *
-     * <p>
-     * Note: This call must be followed by the
-     * {@link #saveJobUpdateEvent(IJobUpdateKey, IJobUpdateEvent)} before fetching a saved update as
-     * it does not save the following required fields:
-     * <ul>
-     *   <li>{@link org.apache.aurora.gen.JobUpdateState#status}</li>
-     *   <li>{@link org.apache.aurora.gen.JobUpdateState#createdTimestampMs}</li>
-     *   <li>{@link org.apache.aurora.gen.JobUpdateState#lastModifiedTimestampMs}</li>
-     * </ul>
-     * The above fields are auto-populated from the update events and any attempt to fetch an update
-     * without having at least one {@link IJobUpdateEvent} present in the store will return empty.
+     * Saves a job update.
      *
      * @param update Update to save.
      */
@@ -143,22 +91,15 @@ public interface JobUpdateStore {
     void saveJobInstanceUpdateEvent(IJobUpdateKey key, IJobInstanceUpdateEvent event);
 
     /**
-     * Deletes all updates and update events from the store.
+     * Deletes job updates.
+     *
+     * @param keys Keys of the updates to delete.
      */
-    void deleteAllUpdatesAndEvents();
+    void removeJobUpdates(Set<IJobUpdateKey> keys);
 
     /**
-     * Prunes (deletes) old completed updates and events from the store.
-     * <p>
-     * At least {@code perJobRetainCount} last completed updates that completed less than
-     * {@code historyPruneThreshold} ago will be kept for every job.
-     *
-     * @param perJobRetainCount Number of completed updates to retain per job.
-     * @param historyPruneThresholdMs Earliest timestamp in the past to retain history.
-     *                                Any completed updates created before this timestamp
-     *                                will be pruned.
-     * @return Set of pruned update keys.
+     * Deletes all updates from the store.
      */
-    Set<IJobUpdateKey> pruneHistory(int perJobRetainCount, long historyPruneThresholdMs);
+    void deleteAllUpdates();
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
index 3ce2c7f..07b4bdb 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
@@ -365,9 +365,13 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
               IJobUpdateKey.build(event.getKey()),
               IJobInstanceUpdateEvent.build(op.getSaveJobInstanceUpdateEvent().getEvent()));
         })
-        .put(Op._Fields.PRUNE_JOB_UPDATE_HISTORY, op -> writeBehindJobUpdateStore.pruneHistory(
-            op.getPruneJobUpdateHistory().getPerJobRetainCount(),
-            op.getPruneJobUpdateHistory().getHistoryPruneThresholdMs())).build();
+        .put(Op._Fields.PRUNE_JOB_UPDATE_HISTORY, op -> {
+          LOG.info("Dropping prune operation.  Updates will be pruned later.");
+        })
+        .put(Op._Fields.REMOVE_JOB_UPDATE, op ->
+          writeBehindJobUpdateStore.removeJobUpdates(
+              IJobUpdateKey.setFromBuilders(op.getRemoveJobUpdate().getKeys())))
+        .build();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
index 57c483b..5859f80 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
@@ -218,7 +218,7 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
         @Override
         public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
           snapshot.setJobUpdateDetails(
-              store.getJobUpdateStore().fetchAllJobUpdateDetails().stream()
+              store.getJobUpdateStore().fetchJobUpdates(JobUpdateStore.MATCH_ALL).stream()
                   .map(u -> new StoredJobUpdateDetails().setDetails(u.newBuilder()))
                   .collect(Collectors.toSet()));
         }
@@ -227,7 +227,7 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
         public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
           if (snapshot.getJobUpdateDetailsSize() > 0) {
             JobUpdateStore.Mutable updateStore = store.getJobUpdateStore();
-            updateStore.deleteAllUpdatesAndEvents();
+            updateStore.deleteAllUpdates();
             for (StoredJobUpdateDetails storedDetails : snapshot.getJobUpdateDetails()) {
               JobUpdateDetails details = storedDetails.getDetails();
               updateStore.saveJobUpdate(thriftBackfill.backFillJobUpdate(details.getUpdate()));

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
index 4d051fc..41061f8 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
@@ -23,7 +23,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 
 import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.gen.storage.PruneJobUpdateHistory;
 import org.apache.aurora.gen.storage.RemoveJob;
 import org.apache.aurora.gen.storage.RemoveQuota;
 import org.apache.aurora.gen.storage.RemoveTasks;
@@ -52,10 +51,8 @@ import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.slf4j.Logger;
@@ -237,21 +234,12 @@ class WriteAheadStorage implements
   }
 
   @Override
-  public Set<IJobUpdateKey> pruneHistory(int perJobRetainCount, long historyPruneThresholdMs) {
-    Set<IJobUpdateKey> prunedUpdates = jobUpdateStore.pruneHistory(
-        perJobRetainCount,
-        historyPruneThresholdMs);
+  public void removeJobUpdates(Set<IJobUpdateKey> keys) {
+    requireNonNull(keys);
 
-    if (!prunedUpdates.isEmpty()) {
-      // Pruned updates will eventually go away from persisted storage when a new snapshot is cut.
-      // So, persisting pruning attempts is not strictly necessary as the periodic pruner will
-      // provide eventual consistency between volatile and persistent storage upon scheduler
-      // restart. By generating an out of band pruning during log replay the consistency is
-      // achieved sooner without potentially exposing pruned but not yet persisted data.
-      write(Op.pruneJobUpdateHistory(
-          new PruneJobUpdateHistory(perJobRetainCount, historyPruneThresholdMs)));
-    }
-    return prunedUpdates;
+    // Compatibility mode - RemoveJobUpdates is not yet written since older versions cannot
+    // read it.  JobUpdates are only removed implicitly when a snapshot is taken.
+    jobUpdateStore.removeJobUpdates(keys);
   }
 
   @Override
@@ -279,7 +267,7 @@ class WriteAheadStorage implements
   }
 
   @Override
-  public void deleteAllUpdatesAndEvents() {
+  public void deleteAllUpdates() {
     throw new UnsupportedOperationException(
         "Unsupported since casual storage users should never be doing this.");
   }
@@ -370,37 +358,12 @@ class WriteAheadStorage implements
   }
 
   @Override
-  public List<IJobUpdateSummary> fetchJobUpdateSummaries(IJobUpdateQuery query) {
-    return this.jobUpdateStore.fetchJobUpdateSummaries(query);
-  }
-
-  @Override
-  public List<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateQuery query) {
-    return this.jobUpdateStore.fetchJobUpdateDetails(query);
+  public List<IJobUpdateDetails> fetchJobUpdates(IJobUpdateQuery query) {
+    return this.jobUpdateStore.fetchJobUpdates(query);
   }
 
   @Override
-  public Optional<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateKey key) {
-    return this.jobUpdateStore.fetchJobUpdateDetails(key);
-  }
-
-  @Override
-  public Optional<IJobUpdate> fetchJobUpdate(IJobUpdateKey key) {
+  public Optional<IJobUpdateDetails> fetchJobUpdate(IJobUpdateKey key) {
     return this.jobUpdateStore.fetchJobUpdate(key);
   }
-
-  @Override
-  public Optional<IJobUpdateInstructions> fetchJobUpdateInstructions(IJobUpdateKey key) {
-    return this.jobUpdateStore.fetchJobUpdateInstructions(key);
-  }
-
-  @Override
-  public Set<IJobUpdateDetails> fetchAllJobUpdateDetails() {
-    return this.jobUpdateStore.fetchAllJobUpdateDetails();
-  }
-
-  @Override
-  public List<IJobInstanceUpdateEvent> fetchInstanceEvents(IJobUpdateKey key, int instanceId) {
-    return this.jobUpdateStore.fetchInstanceEvents(key, instanceId);
-  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java
index 826cee9..f96ec08 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java
@@ -14,58 +14,39 @@
 
 package org.apache.aurora.scheduler.storage.mem;
 
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Predicate;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import javax.inject.Inject;
-
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
 import com.google.common.collect.Ordering;
 import com.google.common.primitives.Longs;
 
 import org.apache.aurora.common.base.MorePreconditions;
 import org.apache.aurora.common.inject.TimedInterceptor.Timed;
-import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.gen.JobInstanceUpdateEvent;
-import org.apache.aurora.gen.JobUpdateAction;
 import org.apache.aurora.gen.JobUpdateDetails;
 import org.apache.aurora.gen.JobUpdateEvent;
 import org.apache.aurora.gen.JobUpdateState;
-import org.apache.aurora.gen.JobUpdateStatus;
 import org.apache.aurora.scheduler.storage.JobUpdateStore;
 import org.apache.aurora.scheduler.storage.Storage.StorageException;
 import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
 
 import static java.util.Objects.requireNonNull;
 
-import static org.apache.aurora.scheduler.storage.Util.jobUpdateActionStatName;
-import static org.apache.aurora.scheduler.storage.Util.jobUpdateStatusStatName;
-
 public class MemJobUpdateStore implements JobUpdateStore.Mutable {
 
   private static final Ordering<IJobUpdateDetails> REVERSE_LAST_MODIFIED_ORDER = Ordering.natural()
@@ -73,88 +54,19 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable {
       .onResultOf(u -> u.getUpdate().getSummary().getState().getLastModifiedTimestampMs());
 
   private final Map<IJobUpdateKey, IJobUpdateDetails> updates = Maps.newConcurrentMap();
-  private final LoadingCache<JobUpdateStatus, AtomicLong> jobUpdateEventStats;
-  private final LoadingCache<JobUpdateAction, AtomicLong> jobUpdateActionStats;
-
-  @Inject
-  public MemJobUpdateStore(StatsProvider statsProvider) {
-    this.jobUpdateEventStats = CacheBuilder.newBuilder()
-        .build(new CacheLoader<JobUpdateStatus, AtomicLong>() {
-          @Override
-          public AtomicLong load(JobUpdateStatus status) {
-            return statsProvider.makeCounter(jobUpdateStatusStatName(status));
-          }
-        });
-    for (JobUpdateStatus status : JobUpdateStatus.values()) {
-      jobUpdateEventStats.getUnchecked(status).get();
-    }
-    this.jobUpdateActionStats = CacheBuilder.newBuilder()
-        .build(new CacheLoader<JobUpdateAction, AtomicLong>() {
-          @Override
-          public AtomicLong load(JobUpdateAction action) {
-            return statsProvider.makeCounter(jobUpdateActionStatName(action));
-          }
-        });
-    for (JobUpdateAction action : JobUpdateAction.values()) {
-      jobUpdateActionStats.getUnchecked(action).get();
-    }
-  }
 
-  @Timed("job_update_store_fetch_summaries")
+  @Timed("job_update_store_fetch_details_query")
   @Override
-  public synchronized List<IJobUpdateSummary> fetchJobUpdateSummaries(IJobUpdateQuery query) {
-    return performQuery(query)
-        .map(u -> u.getUpdate().getSummary())
-        .collect(Collectors.toList());
-  }
-
-  @Timed("job_update_store_fetch_details_list")
-  @Override
-  public synchronized List<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateQuery query) {
+  public synchronized List<IJobUpdateDetails> fetchJobUpdates(IJobUpdateQuery query) {
     return performQuery(query).collect(Collectors.toList());
   }
 
   @Timed("job_update_store_fetch_details")
   @Override
-  public synchronized Optional<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateKey key) {
+  public synchronized Optional<IJobUpdateDetails> fetchJobUpdate(IJobUpdateKey key) {
     return Optional.fromNullable(updates.get(key));
   }
 
-  @Timed("job_update_store_fetch_update")
-  @Override
-  public synchronized Optional<IJobUpdate> fetchJobUpdate(IJobUpdateKey key) {
-    return Optional.fromNullable(updates.get(key)).transform(IJobUpdateDetails::getUpdate);
-  }
-
-  @Timed("job_update_store_fetch_instructions")
-  @Override
-  public synchronized Optional<IJobUpdateInstructions> fetchJobUpdateInstructions(
-      IJobUpdateKey key) {
-
-    return Optional.fromNullable(updates.get(key))
-        .transform(u -> u.getUpdate().getInstructions());
-  }
-
-  @Timed("job_update_store_fetch_all_details")
-  @Override
-  public synchronized Set<IJobUpdateDetails> fetchAllJobUpdateDetails() {
-    return ImmutableSet.copyOf(updates.values());
-  }
-
-  @Timed("job_update_store_fetch_instance_events")
-  @Override
-  public synchronized List<IJobInstanceUpdateEvent> fetchInstanceEvents(
-      IJobUpdateKey key,
-      int instanceId) {
-
-    return java.util.Optional.ofNullable(updates.get(key))
-        .map(IJobUpdateDetails::getInstanceEvents)
-        .orElse(ImmutableList.of())
-        .stream()
-        .filter(e -> e.getInstanceId() == instanceId)
-        .collect(Collectors.toList());
-  }
-
   private static void validateInstructions(IJobUpdateInstructions instructions) {
     if (!instructions.isSetDesiredState() && instructions.getInitialState().isEmpty()) {
       throw new IllegalArgumentException(
@@ -182,10 +94,6 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable {
     requireNonNull(update);
     validateInstructions(update.getInstructions());
 
-    if (updates.containsKey(update.getSummary().getKey())) {
-      throw new StorageException("Update already exists: " + update.getSummary().getKey());
-    }
-
     JobUpdateDetails mutable = new JobUpdateDetails()
         .setUpdate(update.newBuilder())
         .setUpdateEvents(ImmutableList.of())
@@ -211,7 +119,6 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable {
     mutable.setUpdateEvents(EVENT_ORDERING.sortedCopy(mutable.getUpdateEvents()));
     mutable.getUpdate().getSummary().setState(synthesizeUpdateState(mutable));
     updates.put(key, IJobUpdateDetails.build(mutable));
-    jobUpdateEventStats.getUnchecked(event.getStatus()).incrementAndGet();
   }
 
   private static final Ordering<JobInstanceUpdateEvent> INSTANCE_EVENT_ORDERING = Ordering.natural()
@@ -233,66 +140,23 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable {
     mutable.setInstanceEvents(INSTANCE_EVENT_ORDERING.sortedCopy(mutable.getInstanceEvents()));
     mutable.getUpdate().getSummary().setState(synthesizeUpdateState(mutable));
     updates.put(key, IJobUpdateDetails.build(mutable));
-    jobUpdateActionStats.getUnchecked(event.getAction()).incrementAndGet();
   }
 
-  @Timed("job_update_store_delete_all")
+  @Timed("job_update_store_delete_updates")
   @Override
-  public synchronized void deleteAllUpdatesAndEvents() {
-    updates.clear();
+  public synchronized void removeJobUpdates(Set<IJobUpdateKey> key) {
+    requireNonNull(key);
+    updates.keySet().removeAll(key);
   }
 
-  @Timed("job_update_store_prune_history")
+  @Timed("job_update_store_delete_all")
   @Override
-  public synchronized Set<IJobUpdateKey> pruneHistory(
-      int perJobRetainCount,
-      long historyPruneThresholdMs) {
-
-    Supplier<Stream<IJobUpdateSummary>> completedUpdates = () -> updates.values().stream()
-        .map(u -> u.getUpdate().getSummary())
-        .filter(s -> TERMINAL_STATES.contains(s.getState().getStatus()));
-
-    Predicate<IJobUpdateSummary> expiredFilter =
-        s -> s.getState().getCreatedTimestampMs() < historyPruneThresholdMs;
-
-    ImmutableSet.Builder<IJobUpdateKey> pruneBuilder = ImmutableSet.builder();
-
-    // Gather updates based on time threshold.
-    pruneBuilder.addAll(completedUpdates.get()
-        .filter(expiredFilter)
-        .map(IJobUpdateSummary::getKey)
-        .collect(Collectors.toList()));
-
-    Multimap<IJobKey, IJobUpdateSummary> updatesByJob = Multimaps.index(
-        // Avoid counting to-be-removed expired updates.
-        completedUpdates.get().filter(expiredFilter.negate()).iterator(),
-        s -> s.getKey().getJob());
-
-    for (Map.Entry<IJobKey, Collection<IJobUpdateSummary>> entry
-        : updatesByJob.asMap().entrySet()) {
-
-      if (entry.getValue().size() > perJobRetainCount) {
-        Ordering<IJobUpdateSummary> creationOrder = Ordering.natural()
-            .onResultOf(s -> s.getState().getCreatedTimestampMs());
-        pruneBuilder.addAll(creationOrder
-            .leastOf(entry.getValue(), entry.getValue().size() - perJobRetainCount)
-            .stream()
-            .map(IJobUpdateSummary::getKey)
-            .iterator());
-      }
-    }
-
-    Set<IJobUpdateKey> pruned = pruneBuilder.build();
-    updates.keySet().removeAll(pruned);
-
-    return pruned;
+  public synchronized void deleteAllUpdates() {
+    updates.clear();
   }
 
   private static JobUpdateState synthesizeUpdateState(JobUpdateDetails update) {
-    JobUpdateState state = update.getUpdate().getSummary().getState();
-    if (state == null) {
-      state = new JobUpdateState();
-    }
+    JobUpdateState state = new JobUpdateState();
 
     JobUpdateEvent firstEvent = Iterables.getFirst(update.getUpdateEvents(), null);
     if (firstEvent != null) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java
index bba1161..9d327e4 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java
@@ -19,6 +19,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import javax.annotation.Nullable;
 import javax.inject.Inject;
@@ -308,11 +309,15 @@ class ReadOnlySchedulerImpl implements ReadOnlyScheduler.Iface {
   @Override
   public Response getJobUpdateSummaries(JobUpdateQuery mutableQuery) {
     IJobUpdateQuery query = IJobUpdateQuery.build(requireNonNull(mutableQuery));
-    return ok(Result.getJobUpdateSummariesResult(
-        new GetJobUpdateSummariesResult()
-            .setUpdateSummaries(IJobUpdateSummary.toBuildersList(storage.read(
-                storeProvider ->
-                    storeProvider.getJobUpdateStore().fetchJobUpdateSummaries(query))))));
+
+    List<IJobUpdateSummary> summaries = storage.read(
+        storeProvider -> storeProvider.getJobUpdateStore()
+            .fetchJobUpdates(query)
+            .stream()
+            .map(u -> u.getUpdate().getSummary()).collect(Collectors.toList()));
+
+    return ok(Result.getJobUpdateSummariesResult(new GetJobUpdateSummariesResult()
+        .setUpdateSummaries(IJobUpdateSummary.toBuildersList(summaries))));
   }
 
   @Override
@@ -324,8 +329,8 @@ class ReadOnlySchedulerImpl implements ReadOnlyScheduler.Iface {
     if (mutableQuery != null) {
       IJobUpdateQuery query = IJobUpdateQuery.build(mutableQuery);
 
-      List<IJobUpdateDetails> details = storage.read(storeProvider ->
-          storeProvider.getJobUpdateStore().fetchJobUpdateDetails(query));
+      List<IJobUpdateDetails> details =
+          storage.read(storeProvider -> storeProvider.getJobUpdateStore().fetchJobUpdates(query));
 
       return ok(Result.getJobUpdateDetailsResult(new GetJobUpdateDetailsResult()
           .setDetailsList(IJobUpdateDetails.toBuildersList(details))));
@@ -334,7 +339,7 @@ class ReadOnlySchedulerImpl implements ReadOnlyScheduler.Iface {
     // TODO(zmanji): Remove this code once `mutableKey` is removed in AURORA-1765
     IJobUpdateKey key = IJobUpdateKey.build(mutableKey);
     Optional<IJobUpdateDetails> details = storage.read(storeProvider ->
-        storeProvider.getJobUpdateStore().fetchJobUpdateDetails(key));
+        storeProvider.getJobUpdateStore().fetchJobUpdate(key));
 
     if (details.isPresent()) {
       return addMessage(ok(Result.getJobUpdateDetailsResult(

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index dc8d11c..87b18b4 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -13,17 +13,23 @@
  */
 package org.apache.aurora.scheduler.updater;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Functions;
 import com.google.common.base.Optional;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -36,6 +42,7 @@ import org.apache.aurora.common.application.Lifecycle;
 import org.apache.aurora.common.collections.Pair;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.common.util.Clock;
 import org.apache.aurora.gen.JobInstanceUpdateEvent;
 import org.apache.aurora.gen.JobUpdateAction;
@@ -80,6 +87,8 @@ import static org.apache.aurora.gen.JobUpdateStatus.ROLL_FORWARD_AWAITING_PULSE;
 import static org.apache.aurora.scheduler.base.AsyncUtil.shutdownOnError;
 import static org.apache.aurora.scheduler.base.Jobs.AWAITING_PULSE_STATES;
 import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import static org.apache.aurora.scheduler.storage.Util.jobUpdateActionStatName;
+import static org.apache.aurora.scheduler.storage.Util.jobUpdateStatusStatName;
 import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.ACTIVE_QUERY;
 import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.AUTO_RESUME_STATES;
 import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.GET_ACTIVE_RESUME_STATE;
@@ -123,6 +132,9 @@ class JobUpdateControllerImpl implements JobUpdateController {
   private final Map<IJobKey, UpdateFactory.Update> updates =
       Collections.synchronizedMap(Maps.newHashMap());
 
+  private final LoadingCache<JobUpdateStatus, AtomicLong> jobUpdateEventStats;
+  private final LoadingCache<JobUpdateAction, AtomicLong> jobUpdateActionStats;
+
   @Inject
   JobUpdateControllerImpl(
       UpdateFactory updateFactory,
@@ -132,7 +144,8 @@ class JobUpdateControllerImpl implements JobUpdateController {
       UpdateAgentReserver updateAgentReserver,
       Clock clock,
       Lifecycle lifecycle,
-      TaskEventBatchWorker batchWorker) {
+      TaskEventBatchWorker batchWorker,
+      StatsProvider statsProvider) {
 
     this.updateFactory = requireNonNull(updateFactory);
     this.storage = requireNonNull(storage);
@@ -143,6 +156,26 @@ class JobUpdateControllerImpl implements JobUpdateController {
     this.batchWorker = requireNonNull(batchWorker);
     this.pulseHandler = new PulseHandler(clock);
     this.updateAgentReserver = requireNonNull(updateAgentReserver);
+
+    this.jobUpdateEventStats = CacheBuilder.newBuilder()
+        .build(new CacheLoader<JobUpdateStatus, AtomicLong>() {
+          @Override
+          public AtomicLong load(JobUpdateStatus status) {
+            return statsProvider.makeCounter(jobUpdateStatusStatName(status));
+          }
+        });
+    Arrays.stream(JobUpdateStatus.values())
+        .forEach(status -> jobUpdateEventStats.getUnchecked(status).get());
+
+    this.jobUpdateActionStats = CacheBuilder.newBuilder()
+        .build(new CacheLoader<JobUpdateAction, AtomicLong>() {
+          @Override
+          public AtomicLong load(JobUpdateAction action) {
+            return statsProvider.makeCounter(jobUpdateActionStatName(action));
+          }
+        });
+    Arrays.stream(JobUpdateAction.values())
+        .forEach(action -> jobUpdateActionStats.getUnchecked(action).get());
   }
 
   @Override
@@ -164,8 +197,8 @@ class JobUpdateControllerImpl implements JobUpdateController {
         throw new IllegalArgumentException("Update instruction is a no-op.");
       }
 
-      List<IJobUpdateSummary> activeJobUpdates =
-          storeProvider.getJobUpdateStore().fetchJobUpdateSummaries(queryActiveByJob(job));
+      List<IJobUpdateDetails> activeJobUpdates =
+          storeProvider.getJobUpdateStore().fetchJobUpdates(queryActiveByJob(job));
       if (!activeJobUpdates.isEmpty()) {
         if (activeJobUpdates.size() > 1) {
           LOG.error("Multiple active updates exist for this job. {}", activeJobUpdates);
@@ -173,11 +206,11 @@ class JobUpdateControllerImpl implements JobUpdateController {
               String.format("Multiple active updates exist for this job. %s", activeJobUpdates));
         }
 
-        IJobUpdateSummary activeJobUpdate = activeJobUpdates.get(0);
+        IJobUpdateDetails activeUpdate = activeJobUpdates.stream().findFirst().get();
         throw new UpdateInProgressException("An active update already exists for this job, "
             + "please terminate it before starting another. "
             + "Active updates are those in states " + Updates.ACTIVE_JOB_UPDATE_STATES,
-            activeJobUpdate);
+            activeUpdate.getUpdate().getSummary());
       }
 
       LOG.info("Starting update for job " + job);
@@ -202,7 +235,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
     requireNonNull(job);
 
     if (storage.read(p -> !p.getJobUpdateStore()
-        .fetchJobUpdateSummaries(queryActiveByJob(job)).isEmpty())) {
+        .fetchJobUpdates(queryActiveByJob(job)).isEmpty())) {
 
       throw new JobUpdatingException("Job is currently updating");
     }
@@ -225,14 +258,13 @@ class JobUpdateControllerImpl implements JobUpdateController {
     requireNonNull(auditData);
     LOG.info("Attempting to resume update " + key);
     storage.write((NoResult<UpdateStateException>) storeProvider -> {
-      IJobUpdateDetails details = Iterables.getOnlyElement(
-          storeProvider.getJobUpdateStore().fetchJobUpdateDetails(queryByUpdate(key)), null);
+      Optional<IJobUpdateDetails> details = storeProvider.getJobUpdateStore().fetchJobUpdate(key);
 
-      if (details == null) {
+      if (!details.isPresent()) {
         throw new UpdateStateException("Update does not exist: " + key);
       }
 
-      IJobUpdate update = details.getUpdate();
+      IJobUpdate update = details.get().getUpdate();
       IJobUpdateKey key1 = update.getSummary().getKey();
       Function<JobUpdateStatus, JobUpdateStatus> stateChange =
           isCoordinatedAndPulseExpired(key1, update.getInstructions())
@@ -294,7 +326,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
   public void systemResume() {
     storage.write((NoResult.Quiet) storeProvider -> {
       for (IJobUpdateDetails details
-          : storeProvider.getJobUpdateStore().fetchJobUpdateDetails(ACTIVE_QUERY)) {
+          : storeProvider.getJobUpdateStore().fetchJobUpdates(ACTIVE_QUERY)) {
 
         IJobUpdateSummary summary = details.getUpdate().getSummary();
         IJobUpdateInstructions instructions = details.getUpdate().getInstructions();
@@ -396,7 +428,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
   }
 
   private IJobUpdateSummary getOnlyMatch(JobUpdateStore store, IJobUpdateQuery query) {
-    return Iterables.getOnlyElement(store.fetchJobUpdateSummaries(query));
+    return Iterables.getOnlyElement(store.fetchJobUpdates(query)).getUpdate().getSummary();
   }
 
   @VisibleForTesting
@@ -423,13 +455,13 @@ class JobUpdateControllerImpl implements JobUpdateController {
 
     storage.write((NoResult<UpdateStateException>) storeProvider -> {
 
-      IJobUpdateSummary update = Iterables.getOnlyElement(
-          storeProvider.getJobUpdateStore().fetchJobUpdateSummaries(queryByUpdate(key)), null);
-      if (update == null) {
+      Optional<IJobUpdateDetails> update = storeProvider.getJobUpdateStore().fetchJobUpdate(key);
+      if (!update.isPresent()) {
         throw new UpdateStateException("Update does not exist " + key);
       }
 
-      changeUpdateStatus(storeProvider, update, stateChange.apply(update.getState().getStatus()));
+      IJobUpdateSummary summary = update.get().getUpdate().getSummary();
+      changeUpdateStatus(storeProvider, summary, stateChange.apply(summary.getState().getStatus()));
     });
   }
 
@@ -468,6 +500,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
       updateStore.saveJobUpdateEvent(
           key,
           IJobUpdateEvent.build(proposedEvent.setTimestampMs(clock.nowMillis()).setStatus(status)));
+      jobUpdateEventStats.getUnchecked(status).incrementAndGet();
     }
 
     if (JobUpdateStore.TERMINAL_STATES.contains(status)) {
@@ -487,7 +520,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
         checkState(!updates.containsKey(job), "Updater already exists for %s", job);
       }
 
-      IJobUpdate jobUpdate = updateStore.fetchJobUpdate(key).get();
+      IJobUpdate jobUpdate = updateStore.fetchJobUpdate(key).get().getUpdate();
       UpdateFactory.Update update;
       try {
         update = updateFactory.newUpdate(jobUpdate.getInstructions(), action == ROLL_FORWARD);
@@ -556,7 +589,8 @@ class JobUpdateControllerImpl implements JobUpdateController {
 
     JobUpdateStore.Mutable updateStore = storeProvider.getJobUpdateStore();
 
-    IJobUpdateInstructions instructions = updateStore.fetchJobUpdateInstructions(key).get();
+    IJobUpdateInstructions instructions = updateStore.fetchJobUpdate(key).get()
+        .getUpdate().getInstructions();
     if (isCoordinatedAndPulseExpired(key, instructions)) {
       // Move coordinated update into awaiting pulse state.
       JobUpdateStatus blockedStatus = getBlockedState(summary.getState().getStatus());
@@ -578,7 +612,11 @@ class JobUpdateControllerImpl implements JobUpdateController {
       Iterable<InstanceUpdateStatus> statusChanges;
 
       int instanceId = entry.getKey();
-      List<IJobInstanceUpdateEvent> savedEvents = updateStore.fetchInstanceEvents(key, instanceId);
+      List<IJobInstanceUpdateEvent> savedEvents = updateStore.fetchJobUpdate(key).get()
+          .getInstanceEvents()
+          .stream()
+          .filter(e -> e.getInstanceId() == instanceId)
+          .collect(Collectors.toList());
 
       Set<JobUpdateAction> savedActions =
           FluentIterable.from(savedEvents).transform(EVENT_TO_ACTION).toSet();
@@ -609,6 +647,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
                   .setTimestampMs(clock.nowMillis())
                   .setAction(action));
           updateStore.saveJobInstanceUpdateEvent(summary.getKey(), event);
+          jobUpdateActionStats.getUnchecked(action).incrementAndGet();
         }
       }
     }
@@ -701,11 +740,6 @@ class JobUpdateControllerImpl implements JobUpdateController {
               JobUpdateAction.INSTANCE_ROLLBACK_FAILED)
           .build();
 
-  @VisibleForTesting
-  static IJobUpdateQuery queryByUpdate(IJobUpdateKey key) {
-    return IJobUpdateQuery.build(new JobUpdateQuery().setKey(key.newBuilder()));
-  }
-
   private static JobUpdateEvent newEvent(JobUpdateStatus status) {
     return new JobUpdateEvent().setStatus(status);
   }
@@ -722,7 +756,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
         String.format(FATAL_ERROR_FORMAT, "Key: " + key + " Instance key: " + instance),
         () -> storage.write((NoResult.Quiet) storeProvider -> {
           IJobUpdateSummary summary =
-              getOnlyMatch(storeProvider.getJobUpdateStore(), queryByUpdate(key));
+              storeProvider.getJobUpdateStore().fetchJobUpdate(key).get().getUpdate().getSummary();
           JobUpdateStatus status = summary.getState().getStatus();
           // Suppress this evaluation if the updater is not currently active.
           if (JobUpdateStateMachine.isActive(status)) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/test/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPrunerTest.java
index 74db5ec..a1bf04a 100644
--- a/src/test/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPrunerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPrunerTest.java
@@ -13,67 +13,157 @@
  */
 package org.apache.aurora.scheduler.pruning;
 
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.common.util.Clock;
-import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.common.util.testing.FakeClock;
+import org.apache.aurora.gen.InstanceTaskConfig;
+import org.apache.aurora.gen.JobUpdate;
+import org.apache.aurora.gen.JobUpdateDetails;
+import org.apache.aurora.gen.JobUpdateEvent;
+import org.apache.aurora.gen.JobUpdateInstructions;
 import org.apache.aurora.gen.JobUpdateKey;
+import org.apache.aurora.gen.JobUpdateState;
+import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.JobUpdateSummary;
+import org.apache.aurora.gen.Range;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.pruning.JobUpdateHistoryPruner.HistoryPrunerSettings;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.junit.Before;
 import org.junit.Test;
 
-import static org.apache.aurora.scheduler.pruning.JobUpdateHistoryPruner.JOB_UPDATES_PRUNED;
-import static org.easymock.EasyMock.expect;
+import static org.apache.aurora.gen.JobUpdateStatus.ABORTED;
+import static org.apache.aurora.gen.JobUpdateStatus.ERROR;
+import static org.apache.aurora.gen.JobUpdateStatus.FAILED;
+import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_BACK;
+import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_BACK;
+import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
 import static org.junit.Assert.assertEquals;
 
-public class JobUpdateHistoryPrunerTest extends EasyMockTest {
+public class JobUpdateHistoryPrunerTest {
+
+  private Storage storage;
+
+  @Before
+  public void setUp() {
+    storage = MemStorageModule.newEmptyStorage();
+  }
+
   @Test
-  public void testExecution() throws Exception {
-    StorageTestUtil storageUtil = new StorageTestUtil(this);
-    storageUtil.expectOperations();
+  public void testPruneHistory() {
+    IJobKey job2 = JobKeys.from("testRole2", "testEnv2", "job2");
+
+    IJobUpdateDetails update1 = makeAndSave(makeKey("u1"), ROLLING_BACK, 123L, 123L);
+    IJobUpdateDetails update2 = makeAndSave(makeKey("u2"), ABORTED, 124L, 124L);
+    IJobUpdateDetails update3 = makeAndSave(makeKey("u3"), ROLLED_BACK, 125L, 125L);
+    IJobUpdateDetails update4 = makeAndSave(makeKey("u4"), FAILED, 126L, 126L);
+    IJobUpdateDetails update5 = makeAndSave(makeKey(job2, "u5"), ERROR, 123L, 123L);
+    IJobUpdateDetails update6 = makeAndSave(makeKey(job2, "u6"), FAILED, 125L, 125L);
+    IJobUpdateDetails update7 = makeAndSave(makeKey(job2, "u7"), ROLLING_FORWARD, 126L, 126L);
 
-    final FakeStatsProvider statsProvider = new FakeStatsProvider();
+    long pruningThreshold = 120;
 
-    final ScheduledExecutorService executor = createMock(ScheduledExecutorService.class);
-    FakeScheduledExecutor executorClock =
-        FakeScheduledExecutor.scheduleAtFixedRateExecutor(executor, 2);
+    // No updates pruned.
+    pruneHistory(3, pruningThreshold);
+    assertRetainedUpdates(update1, update2, update3, update4, update5, update6, update7);
 
-    Clock mockClock = createMock(Clock.class);
-    expect(mockClock.nowMillis()).andReturn(2L).times(2);
+    // 1 update pruned.
+    pruneHistory(2, pruningThreshold);
+    assertRetainedUpdates(update1, update3, update4, update5, update6, update7);
 
-    expect(storageUtil.jobUpdateStore.pruneHistory(1, 1))
-        .andReturn(ImmutableSet.of(
-            IJobUpdateKey.build(
-                new JobUpdateKey().setJob(new JobKey("role", "env", "job")).setId("id1"))));
-    expect(storageUtil.jobUpdateStore.pruneHistory(1, 1)).andReturn(ImmutableSet.of());
+    // 2 update pruned.
+    pruneHistory(1, pruningThreshold);
+    assertRetainedUpdates(update1, update4, update6, update7);
 
-    control.replay();
+    // The oldest update is pruned.
+    pruneHistory(1, 126);
+    assertRetainedUpdates(update1, update4, update7);
 
-    executorClock.assertEmpty();
+    // Nothing survives the 0 per job count.
+    pruneHistory(0, pruningThreshold);
+    assertRetainedUpdates(update1, update7);
+  }
+
+  private void pruneHistory(int retainCount, long pruningThresholdMs) {
+    FakeClock clock = new FakeClock();
+    clock.setNowMillis(100 + pruningThresholdMs);
     JobUpdateHistoryPruner pruner = new JobUpdateHistoryPruner(
-        mockClock,
-        executor,
-        storageUtil.storage,
+        clock,
+        storage,
         new HistoryPrunerSettings(
-            Amount.of(1L, Time.MILLISECONDS),
-            Amount.of(1L, Time.MILLISECONDS),
-            1),
-        statsProvider);
-
-    pruner.startAsync().awaitRunning();
-
-    assertEquals(0L, statsProvider.getValue(JOB_UPDATES_PRUNED));
-    executorClock.advance(Amount.of(1L, Time.MILLISECONDS));
-    assertEquals(1L, statsProvider.getValue(JOB_UPDATES_PRUNED));
-    executorClock.advance(Amount.of(1L, Time.MILLISECONDS));
-    assertEquals(1L, statsProvider.getValue(JOB_UPDATES_PRUNED));
+            Amount.of(1L, Time.DAYS),
+            Amount.of(100L, Time.MILLISECONDS),
+            retainCount),
+        new FakeStatsProvider());
+    pruner.runForTest();
+  }
+
+  private void assertRetainedUpdates(IJobUpdateDetails... updates) {
+    storage.read(store -> {
+      assertEquals(
+          Stream.of(updates).map(u -> u.getUpdate().getSummary().getKey())
+              .collect(Collectors.toSet()),
+          store.getJobUpdateStore().fetchJobUpdates(JobUpdateStore.MATCH_ALL).stream()
+              .map(u -> u.getUpdate().getSummary().getKey())
+              .collect(Collectors.toSet()));
+      return null;
+    });
+  }
+
+  private static IJobUpdateKey makeKey(String id) {
+    return makeKey(JOB, id);
+  }
+
+  private static IJobUpdateKey makeKey(IJobKey job, String id) {
+    return IJobUpdateKey.build(new JobUpdateKey().setJob(job.newBuilder()).setId(id));
+  }
+
+  private IJobUpdateDetails makeAndSave(
+      IJobUpdateKey key,
+      JobUpdateStatus status,
+      long createdMs,
+      long lastMs) {
+
+    IJobUpdateDetails update = IJobUpdateDetails.build(new JobUpdateDetails()
+        .setUpdateEvents(ImmutableList.of(
+            new JobUpdateEvent(status, lastMs)
+                .setUser("user")
+                .setMessage("message")
+        ))
+        .setInstanceEvents(ImmutableList.of())
+        .setUpdate(new JobUpdate()
+            .setInstructions(new JobUpdateInstructions()
+                .setDesiredState(new InstanceTaskConfig()
+                    .setTask(new TaskConfig())
+                    .setInstances(ImmutableSet.of(new Range()))))
+            .setSummary(new JobUpdateSummary()
+                .setKey(key.newBuilder())
+                .setState(new JobUpdateState()
+                    .setCreatedTimestampMs(createdMs)
+                    .setLastModifiedTimestampMs(lastMs)
+                    .setStatus(status)))));
+
+    storage.write((NoResult.Quiet) storeProvider -> {
+      JobUpdateStore.Mutable store = storeProvider.getJobUpdateStore();
+      store.saveJobUpdate(update.getUpdate());
+      update.getUpdateEvents().forEach(event -> store.saveJobUpdateEvent(key, event));
+      update.getInstanceEvents().forEach(event -> store.saveJobInstanceUpdateEvent(key, event));
+    });
+    return update;
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
index 5e5c518..2c33c13 100644
--- a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
@@ -35,7 +35,7 @@ import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.aurora.scheduler.pruning.TaskHistoryPruner.HistoryPrunnerSettings;
+import org.apache.aurora.scheduler.pruning.TaskHistoryPruner.HistoryPrunerSettings;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -92,7 +92,7 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
         executor,
         stateManager,
         clock,
-        new HistoryPrunnerSettings(ONE_DAY, ONE_MINUTE, PER_JOB_HISTORY),
+        new HistoryPrunerSettings(ONE_DAY, ONE_MINUTE, PER_JOB_HISTORY),
         storageUtil.storage,
         new Lifecycle(shutdownCommand),
         batchWorker,

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
index 6be4a9c..c1825f6 100644
--- a/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
@@ -13,8 +13,6 @@
  */
 package org.apache.aurora.scheduler.quota;
 
-import java.util.List;
-
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
@@ -25,6 +23,7 @@ import org.apache.aurora.gen.InstanceTaskConfig;
 import org.apache.aurora.gen.JobConfiguration;
 import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.JobUpdate;
+import org.apache.aurora.gen.JobUpdateDetails;
 import org.apache.aurora.gen.JobUpdateInstructions;
 import org.apache.aurora.gen.JobUpdateKey;
 import org.apache.aurora.gen.JobUpdateSummary;
@@ -44,8 +43,8 @@ import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
@@ -480,16 +479,11 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectTasks(prodTask("foo", 2, 2, 2), prodTask(JOB_NAME, 2, 2, 2)).times(2);
 
     ITaskConfig config = taskConfig(2, 2, 2, true);
-    List<IJobUpdateSummary> summaries = buildJobUpdateSummaries(UPDATE_KEY);
-    IJobUpdate update = buildJobUpdate(summaries.get(0), config, 1, config, 1);
-    JobUpdate builder = update.newBuilder();
-    builder.getInstructions().unsetDesiredState();
-
-    expect(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(config.getJob().getRole())))
-        .andReturn(summaries).times(2);
-
-    expect(jobUpdateStore.fetchJobUpdate(UPDATE_KEY))
-        .andReturn(Optional.of(IJobUpdate.build(builder))).times(2);
+    IJobUpdateDetails update = buildJobUpdate(UPDATE_KEY, config, 1, config, 1);
+    JobUpdateDetails builder = update.newBuilder();
+    builder.getUpdate().getInstructions().unsetDesiredState();
+    expect(jobUpdateStore.fetchJobUpdates(updateQuery(config.getJob().getRole())))
+        .andReturn(ImmutableList.of(IJobUpdateDetails.build(builder))).times(2);
 
     expectNoCronJobs().times(2);
 
@@ -509,16 +503,11 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectTasks(prodTask("foo", 2, 2, 2), prodTask(JOB_NAME, 2, 2, 2)).times(2);
 
     ITaskConfig config = taskConfig(2, 2, 2, true);
-    List<IJobUpdateSummary> summaries = buildJobUpdateSummaries(UPDATE_KEY);
-    IJobUpdate update = buildJobUpdate(summaries.get(0), config, 1, config, 1);
-    JobUpdate builder = update.newBuilder();
-    builder.getInstructions().setInitialState(ImmutableSet.of());
-
-    expect(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(config.getJob().getRole())))
-        .andReturn(summaries).times(2);
-
-    expect(jobUpdateStore.fetchJobUpdate(UPDATE_KEY))
-        .andReturn(Optional.of(IJobUpdate.build(builder))).times(2);
+    IJobUpdateDetails update = buildJobUpdate(UPDATE_KEY, config, 1, config, 1);
+    JobUpdateDetails builder = update.newBuilder();
+    builder.getUpdate().getInstructions().setInitialState(ImmutableSet.of());
+    expect(jobUpdateStore.fetchJobUpdates(updateQuery(config.getJob().getRole())))
+        .andReturn(ImmutableList.of(IJobUpdateDetails.build(builder))).times(2);
 
     expectNoCronJobs().times(2);
 
@@ -538,16 +527,11 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectTasks(prodTask("foo", 2, 2, 2), prodTask("bar", 2, 2, 2)).times(2);
 
     ITaskConfig config = taskConfig(2, 2, 2, true);
-    List<IJobUpdateSummary> summaries = buildJobUpdateSummaries(UPDATE_KEY);
-    IJobUpdate update = buildJobUpdate(summaries.get(0), config, 1, config, 1);
-    JobUpdate builder = update.newBuilder();
-    builder.getInstructions().unsetDesiredState();
-
-    expect(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(config.getJob().getRole())))
-        .andReturn(summaries).times(2);
-
-    expect(jobUpdateStore.fetchJobUpdate(UPDATE_KEY))
-        .andReturn(Optional.of(IJobUpdate.build(builder))).times(2);
+    IJobUpdateDetails update = buildJobUpdate(UPDATE_KEY, config, 1, config, 1);
+    JobUpdateDetails builder = update.newBuilder();
+    builder.getUpdate().getInstructions().unsetDesiredState();
+    expect(jobUpdateStore.fetchJobUpdates(updateQuery(config.getJob().getRole())))
+        .andReturn(ImmutableList.of(IJobUpdateDetails.build(builder))).times(2);
 
     expectNoCronJobs().times(2);
 
@@ -571,8 +555,8 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectNoJobUpdates().times(2);
 
     ITaskConfig config = taskConfig(1, 1, 1, true);
-    IJobUpdate update = buildJobUpdate(
-        buildJobUpdateSummaries(UPDATE_KEY).get(0),
+    IJobUpdateDetails update = buildJobUpdate(
+        UPDATE_KEY,
         taskConfig(2, 2, 2, true),
         1,
         config,
@@ -582,7 +566,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update, storeProvider);
+    QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update.getUpdate(), storeProvider);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
         new QuotaInfo(bag(6, 6, 6), bag(6, 6, 6), EMPTY, bag(0, 0, 0), EMPTY),
@@ -596,8 +580,8 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectNoJobUpdates().times(2);
 
     ITaskConfig config = taskConfig(2, 2, 2, true);
-    IJobUpdate update = buildJobUpdate(
-        buildJobUpdateSummaries(UPDATE_KEY).get(0),
+    IJobUpdateDetails update = buildJobUpdate(
+        UPDATE_KEY,
         config,
         1,
         config,
@@ -607,7 +591,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update, storeProvider);
+    QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update.getUpdate(), storeProvider);
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
         new QuotaInfo(bag(6, 6, 6), bag(4, 4, 4), EMPTY, bag(0, 0, 0), EMPTY),
@@ -624,8 +608,8 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectNoJobUpdates().times(2);
 
     ITaskConfig config = taskConfig(2, 2, 2, true);
-    IJobUpdate update = buildJobUpdate(
-        buildJobUpdateSummaries(UPDATE_KEY).get(0),
+    IJobUpdateDetails update = buildJobUpdate(
+        UPDATE_KEY,
         config,
         1,
         config,
@@ -635,7 +619,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update, storeProvider);
+    QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update.getUpdate(), storeProvider);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
         new QuotaInfo(bag(6, 6, 6), bag(6, 6, 6), EMPTY, bag(0, 0, 0), EMPTY),
@@ -645,8 +629,8 @@ public class QuotaManagerImplTest extends EasyMockTest {
   @Test
   public void testCheckQuotaNewUpdateSkippedForNonProdDesiredState() {
     ITaskConfig config = taskConfig(2, 2, 2, false);
-    IJobUpdate update = buildJobUpdate(
-        buildJobUpdateSummaries(UPDATE_KEY).get(0),
+    IJobUpdateDetails update = buildJobUpdate(
+        UPDATE_KEY,
         taskConfig(2, 2, 2, true),
         1,
         config,
@@ -654,15 +638,15 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update, storeProvider);
+    QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update.getUpdate(), storeProvider);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
   }
 
   @Test
   public void testCheckQuotaNewUpdateSkippedForDedicatedDesiredState() {
     ITaskConfig config = taskConfig(2, 2, 2, false);
-    IJobUpdate update = buildJobUpdate(
-        buildJobUpdateSummaries(UPDATE_KEY).get(0),
+    IJobUpdateDetails update = buildJobUpdate(
+        UPDATE_KEY,
         prodDedicatedTask("dedicatedJob", 1, 1, 1).getAssignedTask().getTask(),
         1,
         config,
@@ -670,20 +654,20 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update, storeProvider);
+    QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update.getUpdate(), storeProvider);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
   }
 
   @Test
   public void testCheckQuotaNewUpdateSkippedForEmptyDesiredState() {
     ITaskConfig config = taskConfig(2, 2, 2, true);
-    IJobUpdate update = buildJobUpdate(
-        buildJobUpdateSummaries(UPDATE_KEY).get(0),
+    IJobUpdateDetails update = buildJobUpdate(
+        UPDATE_KEY,
         config,
         1,
         config,
         1);
-    JobUpdate updateBuilder = update.newBuilder();
+    JobUpdate updateBuilder = update.getUpdate().newBuilder();
     updateBuilder.getInstructions().unsetDesiredState();
 
     control.replay();
@@ -844,50 +828,39 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
   private void expectJobUpdates(
       ITaskConfig initial,
-      int intialInstances,
+      int initialInstances,
       ITaskConfig desired,
       int desiredInstances,
       int times) {
 
     IJobUpdateKey key = IJobUpdateKey.build(new JobUpdateKey(initial.getJob().newBuilder(), "u1"));
-    List<IJobUpdateSummary> summaries = buildJobUpdateSummaries(key);
-    IJobUpdate update =
-        buildJobUpdate(summaries.get(0), initial, intialInstances, desired, desiredInstances);
-
-    expect(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(initial.getJob().getRole())))
-        .andReturn(summaries)
+    expect(jobUpdateStore.fetchJobUpdates(updateQuery(initial.getJob().getRole())))
+        .andReturn(ImmutableList.of(
+            buildJobUpdate(key, initial, initialInstances, desired, desiredInstances)))
         .times(times);
-
-    expect(jobUpdateStore.fetchJobUpdate(key)).andReturn(Optional.of(update)).times(times);
-
-  }
-
-  private List<IJobUpdateSummary> buildJobUpdateSummaries(IJobUpdateKey key) {
-    return ImmutableList.of(IJobUpdateSummary.build(
-        new JobUpdateSummary().setKey(key.newBuilder())));
   }
 
-  private IJobUpdate buildJobUpdate(
-      IJobUpdateSummary summary,
+  private IJobUpdateDetails buildJobUpdate(
+      IJobUpdateKey key,
       ITaskConfig initial,
       int intialInstances,
       ITaskConfig desired,
       int desiredInstances) {
 
-    return IJobUpdate.build(new JobUpdate()
-        .setSummary(summary.newBuilder())
-        .setInstructions(new JobUpdateInstructions()
-            .setDesiredState(new InstanceTaskConfig()
-                .setTask(desired.newBuilder())
-                .setInstances(ImmutableSet.of(new Range(0, desiredInstances - 1))))
-            .setInitialState(ImmutableSet.of(new InstanceTaskConfig()
-                .setTask(initial.newBuilder())
-                .setInstances(ImmutableSet.of(new Range(0, intialInstances - 1)))))));
+    return IJobUpdateDetails.build(new JobUpdateDetails()
+        .setUpdate(new JobUpdate()
+            .setSummary(new JobUpdateSummary().setKey(key.newBuilder()))
+            .setInstructions(new JobUpdateInstructions()
+                .setDesiredState(new InstanceTaskConfig()
+                    .setTask(desired.newBuilder())
+                    .setInstances(ImmutableSet.of(new Range(0, desiredInstances - 1))))
+                .setInitialState(ImmutableSet.of(new InstanceTaskConfig()
+                    .setTask(initial.newBuilder())
+                    .setInstances(ImmutableSet.of(new Range(0, intialInstances - 1))))))));
   }
 
   private IExpectationSetters<?> expectNoJobUpdates() {
-    return expect(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(ROLE)))
-        .andReturn(ImmutableList.of());
+    return expect(jobUpdateStore.fetchJobUpdates(updateQuery(ROLE))).andReturn(ImmutableList.of());
   }
 
   private IExpectationSetters<?> expectNoTasks() {


Mime
View raw message