aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [2/2] incubator-aurora git commit: Refactor existing write APIs for job updates to use IJobUpdateKey.
Date Sat, 21 Feb 2015 01:44:42 GMT
Refactor existing write APIs for job updates to use IJobUpdateKey.

Bugs closed: AURORA-1093

Reviewed at https://reviews.apache.org/r/31170/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/ace02d13
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/ace02d13
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/ace02d13

Branch: refs/heads/master
Commit: ace02d1316786f45b63048f5da6cc90fa69da9e7
Parents: e5de618
Author: Bill Farner <wfarner@apache.org>
Authored: Fri Feb 20 17:43:38 2015 -0800
Committer: Bill Farner <wfarner@apache.org>
Committed: Fri Feb 20 17:43:38 2015 -0800

----------------------------------------------------------------------
 .../thrift/org/apache/aurora/gen/storage.thrift |   5 +-
 .../scheduler/async/JobUpdateHistoryPruner.java |   3 +-
 .../aurora/scheduler/quota/QuotaManager.java    |   4 +-
 .../scheduler/storage/ForwardingStore.java      |   6 +
 .../scheduler/storage/JobUpdateStore.java       |  30 ++-
 .../scheduler/storage/db/DBJobUpdateStore.java  |  37 ++--
 .../db/JobInstanceUpdateEventMapper.java        |   4 +-
 .../scheduler/storage/log/LogStorage.java       |  79 ++++---
 .../storage/log/SnapshotStoreImpl.java          |  10 +-
 .../storage/log/WriteAheadStorage.java          |  26 +--
 .../thrift/SchedulerThriftInterface.java        |   4 +-
 .../scheduler/updater/JobUpdateController.java  |  15 +-
 .../updater/JobUpdateControllerImpl.java        | 115 +++++-----
 .../aurora/scheduler/updater/Updates.java       |  49 +++++
 .../async/JobUpdateHistoryPrunerTest.java       |  11 +-
 .../storage/db/DBJobUpdateStoreTest.java        | 216 ++++++++++---------
 .../scheduler/storage/log/LogStorageTest.java   |  64 ++++--
 .../storage/log/SnapshotStoreImplTest.java      |  29 ++-
 .../storage/log/WriteAheadStorageTest.java      |  10 +-
 .../thrift/SchedulerThriftInterfaceTest.java    |  19 +-
 .../aurora/scheduler/updater/JobUpdaterIT.java  |  17 +-
 21 files changed, 452 insertions(+), 301 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ace02d13/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
----------------------------------------------------------------------
diff --git a/api/src/main/thrift/org/apache/aurora/gen/storage.thrift b/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
index 3798797..31b6981 100644
--- a/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
+++ b/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
@@ -80,13 +80,16 @@ struct StoredJobUpdateDetails {
 
 struct SaveJobUpdateEvent {
   1: api.JobUpdateEvent event
-  /** ID of the lock associated with this update. */
+  // TODO(wfarner): Remove this in 0.9.0.
   2: string updateId
+  3: api.JobUpdateKey key
 }
 
 struct SaveJobInstanceUpdateEvent {
   1: api.JobInstanceUpdateEvent event
+  // TODO(wfarner): Remove this in 0.9.0.
   2: string updateId
+  3: api.JobUpdateKey key
 }
 
 struct PruneJobUpdateHistory {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ace02d13/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java
index 7a349bb..b416343 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java
@@ -29,6 +29,7 @@ import com.twitter.common.util.Clock;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 
 import static java.util.Objects.requireNonNull;
 
@@ -81,7 +82,7 @@ class JobUpdateHistoryPruner extends AbstractIdleService {
             storage.write(new MutateWork.NoResult.Quiet() {
               @Override
               public void execute(MutableStoreProvider storeProvider) {
-                Set<String> prunedUpdates = storeProvider.getJobUpdateStore().pruneHistory(
+                Set<IJobUpdateKey> prunedUpdates = storeProvider.getJobUpdateStore().pruneHistory(
                     settings.maxUpdatesPerJob,
                     clock.nowMillis() - settings.maxHistorySize.as(Time.MILLISECONDS));
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ace02d13/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
index 934b920..dab8c05 100644
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
@@ -54,7 +54,7 @@ import org.apache.aurora.scheduler.storage.entities.IRange;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.updater.JobUpdateController;
+import org.apache.aurora.scheduler.updater.Updates;
 
 import static java.util.Objects.requireNonNull;
 
@@ -362,7 +362,7 @@ public interface QuotaManager {
     static IJobUpdateQuery updateQuery(String role) {
       return IJobUpdateQuery.build(new JobUpdateQuery()
           .setRole(role)
-          .setUpdateStatuses(JobUpdateController.ACTIVE_JOB_UPDATE_STATES));
+          .setUpdateStatuses(Updates.ACTIVE_JOB_UPDATE_STATES));
     }
 
     private static RangeSet<Integer> instanceRangeSet(Set<IInstanceTaskConfig> configs) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ace02d13/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java b/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
index 3ca150c..4d83c58 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
@@ -29,6 +29,7 @@ import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
 import org.apache.aurora.scheduler.storage.entities.ILock;
@@ -144,6 +145,11 @@ public class ForwardingStore implements
   }
 
   @Override
+  public Optional<IJobUpdateKey> fetchUpdateKey(String updateId) {
+    return jobUpdateStore.fetchUpdateKey(updateId);
+  }
+
+  @Override
   public List<IJobUpdateSummary> fetchJobUpdateSummaries(IJobUpdateQuery query) {
     return jobUpdateStore.fetchJobUpdateSummaries(query);
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ace02d13/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
index 629a39b..4b8b00c 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
@@ -24,6 +24,7 @@ import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
 
@@ -82,6 +83,18 @@ public interface JobUpdateStore {
   Set<StoredJobUpdateDetails> fetchAllJobUpdateDetails();
 
   /**
+   * Fetches an update key based on the update ID.
+   *
+   * <p>
+   * This is a compatibility shim for when updates were only identified by a string, and should
+   * be removed in 0.9.0.
+   *
+   * @param updateId Update identifier.
+   * @return The update key matching {@code updateId}.
+   */
+  Optional<IJobUpdateKey> fetchUpdateKey(String updateId);
+
+  /**
    * Gets the lock token associated with a job update.
    *
    * @param updateId Job update ID.
@@ -104,8 +117,9 @@ public interface JobUpdateStore {
      * Saves a new job update.
      *
      * <p>
-     * Note: This call must be followed by the {@link #saveJobUpdateEvent(IJobUpdateEvent, String)}
-     * before fetching a saved update as it does not save the following required fields:
+     * Note: This call must be followed by the
+     * {@link #saveJobUpdateEvent(IJobUpdateKey, IJobUpdateEvent)} before fetching a saved update as
+     * it does not save the following required fields:
      * <ul>
      *   <li>{@link org.apache.aurora.gen.JobUpdateState#status}</li>
      *   <li>{@link org.apache.aurora.gen.JobUpdateState#createdTimestampMs}</li>
@@ -124,18 +138,18 @@ public interface JobUpdateStore {
     /**
      * Saves a new job update event.
      *
+     * @param key Update identifier.
      * @param event Job update event to save.
-     * @param updateId Job update ID.
      */
-    void saveJobUpdateEvent(IJobUpdateEvent event, String updateId);
+    void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event);
 
     /**
      * Saves a new job instance update event.
      *
+     * @param key Update identifier.
      * @param event Job instance update event.
-     * @param updateId Job update ID.
      */
-    void saveJobInstanceUpdateEvent(IJobInstanceUpdateEvent event, String updateId);
+    void saveJobInstanceUpdateEvent(IJobUpdateKey key, IJobInstanceUpdateEvent event);
 
     /**
      * Deletes all updates and update events from the store.
@@ -152,8 +166,8 @@ public interface JobUpdateStore {
      * @param historyPruneThresholdMs Earliest timestamp in the past to retain history.
      *                                Any completed updates created before this timestamp
      *                                will be pruned.
-     * @return Set of pruned update IDs.
+     * @return Set of pruned update keys.
      */
-    Set<String> pruneHistory(int perJobRetainCount, long historyPruneThresholdMs);
+    Set<IJobUpdateKey> pruneHistory(int perJobRetainCount, long historyPruneThresholdMs);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ace02d13/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java
index 34c4ab5..d52c15e 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java
@@ -30,7 +30,6 @@ import org.apache.aurora.gen.JobUpdateInstructions;
 import org.apache.aurora.gen.JobUpdateKey;
 import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
 import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.Storage.StorageException;
 import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig;
 import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
@@ -126,24 +125,14 @@ public class DBJobUpdateStore implements JobUpdateStore.Mutable {
 
   @Timed("job_update_store_save_event")
   @Override
-  public void saveJobUpdateEvent(IJobUpdateEvent event, String updateId) {
-    Optional<IJobUpdateKey> key = fetchUpdateKey(updateId);
-    if (key.isPresent()) {
-      jobEventMapper.insert(key.get(), event.newBuilder());
-    } else {
-      throw new StorageException("No update to associate with update ID " + updateId);
-    }
+  public void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event) {
+    jobEventMapper.insert(key, event.newBuilder());
   }
 
   @Timed("job_update_store_save_instance_event")
   @Override
-  public void saveJobInstanceUpdateEvent(IJobInstanceUpdateEvent event, String updateId) {
-    Optional<IJobUpdateKey> key = fetchUpdateKey(updateId);
-    if (key.isPresent()) {
-      instanceEventMapper.insert(event.newBuilder(), key.get());
-    } else {
-      throw new StorageException("No update to associate with update ID " + updateId);
-    }
+  public void saveJobInstanceUpdateEvent(IJobUpdateKey key, IJobInstanceUpdateEvent event) {
+    instanceEventMapper.insert(key, event.newBuilder());
   }
 
   @Timed("job_update_store_delete_all")
@@ -159,18 +148,18 @@ public class DBJobUpdateStore implements JobUpdateStore.Mutable {
     }
   };
 
-  private static final Function<PruneVictim, String> GET_UPDATE_ID =
-      new Function<PruneVictim, String>() {
+  private static final Function<PruneVictim, IJobUpdateKey> GET_UPDATE_KEY =
+      new Function<PruneVictim, IJobUpdateKey>() {
         @Override
-        public String apply(PruneVictim victim) {
-          return victim.getUpdate().getId();
+        public IJobUpdateKey apply(PruneVictim victim) {
+          return IJobUpdateKey.build(victim.getUpdate());
         }
       };
 
   @Timed("job_update_store_prune_history")
   @Override
-  public Set<String> pruneHistory(int perJobRetainCount, long historyPruneThresholdMs) {
-    ImmutableSet.Builder<String> pruned = ImmutableSet.builder();
+  public Set<IJobUpdateKey> pruneHistory(int perJobRetainCount, long historyPruneThresholdMs) {
+    ImmutableSet.Builder<IJobUpdateKey> pruned = ImmutableSet.builder();
 
     Set<Long> jobKeyIdsToPrune = detailsMapper.selectJobKeysForPruning(
         perJobRetainCount,
@@ -184,7 +173,7 @@ public class DBJobUpdateStore implements JobUpdateStore.Mutable {
 
       detailsMapper.deleteCompletedUpdates(
           FluentIterable.from(pruneVictims).transform(GET_ROW_ID).toSet());
-      pruned.addAll(FluentIterable.from(pruneVictims).transform(GET_UPDATE_ID));
+      pruned.addAll(FluentIterable.from(pruneVictims).transform(GET_UPDATE_KEY));
     }
 
     return pruned.build();
@@ -266,7 +255,9 @@ public class DBJobUpdateStore implements JobUpdateStore.Mutable {
     return ImmutableSet.copyOf(detailsMapper.selectAllDetails());
   }
 
-  private Optional<IJobUpdateKey> fetchUpdateKey(String updateId) {
+  @Timed("job_update_store_fetch_update_key")
+  @Override
+  public Optional<IJobUpdateKey> fetchUpdateKey(String updateId) {
     return Optional.fromNullable(detailsMapper.selectUpdateKey(updateId))
         .transform(IJobUpdateKey.FROM_BUILDER);
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ace02d13/src/main/java/org/apache/aurora/scheduler/storage/db/JobInstanceUpdateEventMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/JobInstanceUpdateEventMapper.java b/src/main/java/org/apache/aurora/scheduler/storage/db/JobInstanceUpdateEventMapper.java
index 4fb33bd..591b781 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/JobInstanceUpdateEventMapper.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/JobInstanceUpdateEventMapper.java
@@ -27,8 +27,8 @@ interface JobInstanceUpdateEventMapper {
   /**
    * Inserts a new job instance update event into the database.
    *
-   * @param event Event to insert.
    * @param key Update key of the event.
+   * @param event Event to insert.
    */
-  void insert(@Param("event") JobInstanceUpdateEvent event, @Param("key") IJobUpdateKey key);
+  void insert(@Param("key") IJobUpdateKey key, @Param("event") JobInstanceUpdateEvent event);
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ace02d13/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
index ba1672f..384e9b5 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
@@ -22,6 +22,7 @@ import java.util.Date;
 import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -38,14 +39,18 @@ import com.twitter.common.inject.TimedInterceptor.Timed;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.stats.SlidingStats;
+import com.twitter.common.stats.Stats;
 import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
 
 import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
 import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.JobUpdateKey;
 import org.apache.aurora.gen.storage.LogEntry;
 import org.apache.aurora.gen.storage.Op;
 import org.apache.aurora.gen.storage.RewriteTask;
 import org.apache.aurora.gen.storage.SaveAcceptedJob;
+import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
+import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
 import org.apache.aurora.gen.storage.SaveQuota;
 import org.apache.aurora.gen.storage.Snapshot;
 import org.apache.aurora.scheduler.base.AsyncUtil;
@@ -70,6 +75,7 @@ import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 import org.apache.aurora.scheduler.storage.entities.ILock;
 import org.apache.aurora.scheduler.storage.entities.ILockKey;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
@@ -201,6 +207,7 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
 
   private final SlidingStats writerWaitStats =
       new SlidingStats("log_storage_write_lock_wait", "ns");
+  private final AtomicLong droppedUpdateEvents = Stats.exportLong("dropped_update_events");
 
   /**
    * Identifies a local storage layer that is written to only after first ensuring the write
@@ -314,7 +321,7 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
     return ImmutableMap.<LogEntry._Fields, Closure<LogEntry>>builder()
         .put(LogEntry._Fields.SNAPSHOT, new Closure<LogEntry>() {
           @Override
-          public void execute(LogEntry logEntry) throws RuntimeException {
+          public void execute(LogEntry logEntry) {
             Snapshot snapshot = logEntry.getSnapshot();
             LOG.info("Applying snapshot taken on " + new Date(snapshot.getTimestamp()));
             snapshotStore.applySnapshot(snapshot);
@@ -322,7 +329,7 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
         })
         .put(LogEntry._Fields.TRANSACTION, new Closure<LogEntry>() {
           @Override
-          public void execute(final LogEntry logEntry) throws RuntimeException {
+          public void execute(final LogEntry logEntry) {
             write(new MutateWork.NoResult.Quiet() {
               @Override
               protected void execute(MutableStoreProvider unused) {
@@ -335,7 +342,7 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
         })
         .put(LogEntry._Fields.NOOP, new Closure<LogEntry>() {
           @Override
-          public void execute(LogEntry item) throws RuntimeException {
+          public void execute(LogEntry item) {
             // Nothing to do here
           }
         })
@@ -347,13 +354,13 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
     return ImmutableMap.<Op._Fields, Closure<Op>>builder()
         .put(Op._Fields.SAVE_FRAMEWORK_ID, new Closure<Op>() {
           @Override
-          public void execute(Op op) throws RuntimeException {
+          public void execute(Op op) {
             writeBehindSchedulerStore.saveFrameworkId(op.getSaveFrameworkId().getId());
           }
         })
         .put(Op._Fields.SAVE_ACCEPTED_JOB, new Closure<Op>() {
           @Override
-          public void execute(Op op) throws RuntimeException {
+          public void execute(Op op) {
             SaveAcceptedJob acceptedJob = op.getSaveAcceptedJob();
             writeBehindJobStore.saveAcceptedJob(
                 acceptedJob.getManagerId(),
@@ -362,20 +369,20 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
         })
         .put(Op._Fields.REMOVE_JOB, new Closure<Op>() {
           @Override
-          public void execute(Op op) throws RuntimeException {
+          public void execute(Op op) {
             writeBehindJobStore.removeJob(IJobKey.build(op.getRemoveJob().getJobKey()));
           }
         })
         .put(Op._Fields.SAVE_TASKS, new Closure<Op>() {
           @Override
-          public void execute(Op op) throws RuntimeException {
+          public void execute(Op op) {
             writeBehindTaskStore.saveTasks(
                 IScheduledTask.setFromBuilders(op.getSaveTasks().getTasks()));
           }
         })
         .put(Op._Fields.REWRITE_TASK, new Closure<Op>() {
           @Override
-          public void execute(Op op) throws RuntimeException {
+          public void execute(Op op) {
             RewriteTask rewriteTask = op.getRewriteTask();
             writeBehindTaskStore.unsafeModifyInPlace(
                 rewriteTask.getTaskId(),
@@ -384,13 +391,13 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
         })
         .put(Op._Fields.REMOVE_TASKS, new Closure<Op>() {
           @Override
-          public void execute(Op op) throws RuntimeException {
+          public void execute(Op op) {
             writeBehindTaskStore.deleteTasks(op.getRemoveTasks().getTaskIds());
           }
         })
         .put(Op._Fields.SAVE_QUOTA, new Closure<Op>() {
           @Override
-          public void execute(Op op) throws RuntimeException {
+          public void execute(Op op) {
             SaveQuota saveQuota = op.getSaveQuota();
             writeBehindQuotaStore.saveQuota(
                 saveQuota.getRole(),
@@ -399,13 +406,13 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
         })
         .put(Op._Fields.REMOVE_QUOTA, new Closure<Op>() {
           @Override
-          public void execute(Op op) throws RuntimeException {
+          public void execute(Op op) {
             writeBehindQuotaStore.removeQuota(op.getRemoveQuota().getRole());
           }
         })
         .put(Op._Fields.SAVE_HOST_ATTRIBUTES, new Closure<Op>() {
           @Override
-          public void execute(Op op) throws RuntimeException {
+          public void execute(Op op) {
             HostAttributes attributes = op.getSaveHostAttributes().getHostAttributes();
             // Prior to commit 5cf760b, the store would persist maintenance mode changes for
             // unknown hosts.  5cf760b began rejecting these, but the replicated log may still
@@ -419,19 +426,19 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
         })
         .put(Op._Fields.SAVE_LOCK, new Closure<Op>() {
           @Override
-          public void execute(Op op) throws RuntimeException {
+          public void execute(Op op) {
             writeBehindLockStore.saveLock(ILock.build(op.getSaveLock().getLock()));
           }
         })
         .put(Op._Fields.REMOVE_LOCK, new Closure<Op>() {
           @Override
-          public void execute(Op op) throws RuntimeException {
+          public void execute(Op op) {
             writeBehindLockStore.removeLock(ILockKey.build(op.getRemoveLock().getLockKey()));
           }
         })
         .put(Op._Fields.SAVE_JOB_UPDATE, new Closure<Op>() {
           @Override
-          public void execute(Op op) throws RuntimeException {
+          public void execute(Op op) {
             writeBehindJobUpdateStore.saveJobUpdate(
                 IJobUpdate.build(op.getSaveJobUpdate().getJobUpdate()),
                 Optional.fromNullable(op.getSaveJobUpdate().getLockToken()));
@@ -439,23 +446,43 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
         })
         .put(Op._Fields.SAVE_JOB_UPDATE_EVENT, new Closure<Op>() {
           @Override
-          public void execute(Op op) throws RuntimeException {
-            writeBehindJobUpdateStore.saveJobUpdateEvent(
-                IJobUpdateEvent.build(op.getSaveJobUpdateEvent().getEvent()),
-                op.getSaveJobUpdateEvent().getUpdateId());
+          public void execute(Op op) {
+            SaveJobUpdateEvent event = op.getSaveJobUpdateEvent();
+            if (!event.isSetKey()) {
+              event.setKey(getUpdateKeyById(event.getUpdateId()).orNull());
+            }
+
+            if (event.isSetKey()) {
+              writeBehindJobUpdateStore.saveJobUpdateEvent(
+                  IJobUpdateKey.build(event.getKey()),
+                  IJobUpdateEvent.build(op.getSaveJobUpdateEvent().getEvent()));
+            } else {
+              LOG.severe("Dropping unidentifiable op: " + op);
+              droppedUpdateEvents.incrementAndGet();
+            }
           }
         })
         .put(Op._Fields.SAVE_JOB_INSTANCE_UPDATE_EVENT, new Closure<Op>() {
           @Override
-          public void execute(Op op) throws RuntimeException {
-            writeBehindJobUpdateStore.saveJobInstanceUpdateEvent(IJobInstanceUpdateEvent.build(
-                op.getSaveJobInstanceUpdateEvent().getEvent()),
-                op.getSaveJobInstanceUpdateEvent().getUpdateId());
+          public void execute(Op op) {
+            SaveJobInstanceUpdateEvent event = op.getSaveJobInstanceUpdateEvent();
+            if (!event.isSetKey()) {
+              event.setKey(getUpdateKeyById(event.getUpdateId()).orNull());
+            }
+
+            if (event.isSetKey()) {
+              writeBehindJobUpdateStore.saveJobInstanceUpdateEvent(
+                  IJobUpdateKey.build(event.getKey()),
+                  IJobInstanceUpdateEvent.build(op.getSaveJobInstanceUpdateEvent().getEvent()));
+            } else {
+              LOG.severe("Dropping unidentifiable op: " + op);
+              droppedUpdateEvents.incrementAndGet();
+            }
           }
         })
         .put(Op._Fields.PRUNE_JOB_UPDATE_HISTORY, new Closure<Op>() {
           @Override
-          public void execute(Op op) throws RuntimeException {
+          public void execute(Op op) {
             writeBehindJobUpdateStore.pruneHistory(
                 op.getPruneJobUpdateHistory().getPerJobRetainCount(),
                 op.getPruneJobUpdateHistory().getHistoryPruneThresholdMs());
@@ -463,6 +490,10 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
         }).build();
   }
 
+  private Optional<JobUpdateKey> getUpdateKeyById(String updateId) {
+    return writeBehindJobUpdateStore.fetchUpdateKey(updateId).transform(IJobUpdateKey.TO_BUILDER);
+  }
+
   @Override
   public synchronized void prepare() {
     writeBehindStorage.prepare();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ace02d13/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
index ea33037..9b38b7d 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
@@ -50,9 +50,11 @@ import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
 import org.apache.aurora.scheduler.storage.entities.ILock;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.updater.Updates;
 
 import static java.util.Objects.requireNonNull;
 
@@ -226,16 +228,16 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
               if (details.getUpdateEventsSize() > 0) {
                 for (JobUpdateEvent updateEvent : details.getUpdateEvents()) {
                   updateStore.saveJobUpdateEvent(
-                      IJobUpdateEvent.build(updateEvent),
-                      details.getUpdate().getSummary().getUpdateId());
+                      Updates.getKey(IJobUpdateSummary.build(details.getUpdate().getSummary())),
+                      IJobUpdateEvent.build(updateEvent));
                 }
               }
 
               if (details.getInstanceEventsSize() > 0) {
                 for (JobInstanceUpdateEvent instanceEvent : details.getInstanceEvents()) {
                   updateStore.saveJobInstanceUpdateEvent(
-                      IJobInstanceUpdateEvent.build(instanceEvent),
-                      details.getUpdate().getSummary().getUpdateId());
+                      Updates.getKey(IJobUpdateSummary.build(details.getUpdate().getSummary())),
+                      IJobInstanceUpdateEvent.build(instanceEvent));
                 }
               }
             }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ace02d13/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
index 2d27599..262d72c 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
@@ -23,7 +23,6 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
-import com.twitter.common.base.MorePreconditions;
 
 import org.apache.aurora.gen.storage.Op;
 import org.apache.aurora.gen.storage.PruneJobUpdateHistory;
@@ -60,6 +59,7 @@ import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 import org.apache.aurora.scheduler.storage.entities.ILock;
 import org.apache.aurora.scheduler.storage.entities.ILockKey;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
@@ -276,28 +276,28 @@ class WriteAheadStorage extends ForwardingStore implements
   }
 
   @Override
-  public void saveJobUpdateEvent(IJobUpdateEvent event, String updateId) {
+  public void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event) {
+    requireNonNull(key);
     requireNonNull(event);
-    MorePreconditions.checkNotBlank(updateId);
 
-    write(Op.saveJobUpdateEvent(new SaveJobUpdateEvent(event.newBuilder(), updateId)));
-    jobUpdateStore.saveJobUpdateEvent(event, updateId);
+    write(Op.saveJobUpdateEvent(
+        new SaveJobUpdateEvent(event.newBuilder(), key.getId(), key.newBuilder())));
+    jobUpdateStore.saveJobUpdateEvent(key, event);
   }
 
   @Override
-  public void saveJobInstanceUpdateEvent(IJobInstanceUpdateEvent event, String updateId) {
+  public void saveJobInstanceUpdateEvent(IJobUpdateKey key, IJobInstanceUpdateEvent event) {
+    requireNonNull(key);
     requireNonNull(event);
-    MorePreconditions.checkNotBlank(updateId);
 
-    write(Op.saveJobInstanceUpdateEvent(new SaveJobInstanceUpdateEvent(
-        event.newBuilder(),
-        updateId)));
-    jobUpdateStore.saveJobInstanceUpdateEvent(event, updateId);
+    write(Op.saveJobInstanceUpdateEvent(
+        new SaveJobInstanceUpdateEvent(event.newBuilder(), key.getId(), key.newBuilder())));
+    jobUpdateStore.saveJobInstanceUpdateEvent(key, event);
   }
 
   @Override
-  public Set<String> pruneHistory(int perJobRetainCount, long historyPruneThresholdMs) {
-    Set<String> prunedUpdates = jobUpdateStore.pruneHistory(
+  public Set<IJobUpdateKey> pruneHistory(int perJobRetainCount, long historyPruneThresholdMs) {
+    Set<IJobUpdateKey> prunedUpdates = jobUpdateStore.pruneHistory(
         perJobRetainCount,
         historyPruneThresholdMs);
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ace02d13/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index 2d21a97..3e491d9 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -1312,9 +1312,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     IJobUpdateKey updateKey = validateJobUpdateKey(mutableUpdateKey);
     try {
       authorizeJobUpdateAction(updateKey, session);
-
-      // TODO(maxim): use IJobUpdateKey to pulse when AURORA-1093 is addressed.
-      JobUpdatePulseStatus result = jobUpdateController.pulse(updateKey.getId());
+      JobUpdatePulseStatus result = jobUpdateController.pulse(updateKey);
       return ok(Result.pulseJobUpdateResult(new PulseJobUpdateResult(result)));
     } catch (AuthFailedException e) {
       return error(AUTH_FAILED, e);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ace02d13/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
index d0def6e..5989a62 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
@@ -13,14 +13,11 @@
  */
 package org.apache.aurora.scheduler.updater;
 
-import java.util.EnumSet;
-
 import org.apache.aurora.gen.JobUpdatePulseStatus;
-import org.apache.aurora.gen.JobUpdateStatus;
-import org.apache.aurora.gen.apiConstants;
 import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 
 /**
@@ -29,12 +26,6 @@ import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 public interface JobUpdateController {
 
   /**
-   * Different states that an active job update may be in.
-   */
-  EnumSet<JobUpdateStatus> ACTIVE_JOB_UPDATE_STATES =
-      EnumSet.copyOf(apiConstants.ACTIVE_JOB_UPDATE_STATES);
-
-  /**
    * Initiates an update.
    *
    * @param update Instructions for what job to update, and how to update it.
@@ -108,9 +99,9 @@ public interface JobUpdateController {
    * {@link org.apache.aurora.gen.JobUpdateSettings#getBlockIfNoPulsesAfterMs}. Unblocks progress
    * if the update was previously blocked.
    *
-   * @param updateId Job update ID being pulsed.
+   * @param key Update identifier.
    * @return Job update pulse status.
    * @throws UpdateStateException If there is no update found or update is not coordinated.
    */
-  JobUpdatePulseStatus pulse(String updateId) throws UpdateStateException;
+  JobUpdatePulseStatus pulse(IJobUpdateKey key) throws UpdateStateException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ace02d13/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index f39d876..d1bd3c9 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -29,7 +29,6 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.inject.Inject;
-
 import com.twitter.common.collections.Pair;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
@@ -59,6 +58,7 @@ import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
 import org.apache.aurora.scheduler.storage.entities.ILock;
@@ -164,7 +164,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
         if (!activeJobUpdates.isEmpty()) {
           throw new UpdateStateException("An active update already exists for this job, "
               + "please terminate it before starting another. "
-              + "Active updates are those in states " + ACTIVE_JOB_UPDATE_STATES);
+              + "Active updates are those in states " + Updates.ACTIVE_JOB_UPDATE_STATES);
         }
 
         LOG.info("Starting update for job " + job);
@@ -188,8 +188,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
 
         recordAndChangeJobUpdateStatus(
             storeProvider,
-            summary.getUpdateId(),
-            job,
+            Updates.getKey(summary),
             status,
             Optional.of(updatingUser));
       }
@@ -218,9 +217,9 @@ class JobUpdateControllerImpl implements JobUpdateController {
         }
 
         IJobUpdate update = details.getUpdate();
-        String updateId = update.getSummary().getUpdateId();
+        IJobUpdateKey key = Updates.getKey(update.getSummary());
         Function<JobUpdateStatus, JobUpdateStatus> stateChange =
-            isCoordinatedAndPulseExpired(updateId, update.getInstructions())
+            isCoordinatedAndPulseExpired(key, update.getInstructions())
                 ? GET_BLOCKED_RESUME_STATE
                 : GET_ACTIVE_RESUME_STATE;
 
@@ -251,33 +250,32 @@ class JobUpdateControllerImpl implements JobUpdateController {
 
           IJobUpdateSummary summary = details.getUpdate().getSummary();
           IJobUpdateInstructions instructions = details.getUpdate().getInstructions();
-          String updateId = summary.getUpdateId();
-          IJobKey job = summary.getJobKey();
+          IJobUpdateKey key = Updates.getKey(summary);
           JobUpdateStatus status = summary.getState().getStatus();
 
-          LOG.info("Automatically resuming update " + JobKeys.canonicalString(job));
+          LOG.info("Automatically resuming update " + key);
 
           if (isCoordinatedUpdate(instructions)) {
             pulseHandler.initializePulseState(details.getUpdate(), status);
           }
 
-          changeJobUpdateStatus(storeProvider, updateId, job, status, NO_USER, false);
+          changeJobUpdateStatus(storeProvider, key, status, NO_USER, false);
         }
       }
     });
   }
 
   @Override
-  public JobUpdatePulseStatus pulse(final String updateId) throws UpdateStateException {
-    final PulseState state = pulseHandler.pulseAndGet(updateId);
+  public JobUpdatePulseStatus pulse(IJobUpdateKey key) throws UpdateStateException {
+    final PulseState state = pulseHandler.pulseAndGet(key);
     if (state == null) {
-      LOG.info("Not pulsing inactive job update: " + updateId);
+      LOG.info("Not pulsing inactive job update: " + key);
       return JobUpdatePulseStatus.FINISHED;
     }
 
-    LOG.info(String.format(
+    LOG.fine(String.format(
         "Job update %s has been pulsed. Timeout of %d msec is reset.",
-        updateId,
+        key,
         state.getPulseTimeoutMs()));
 
     if (JobUpdateStateMachine.isAwaitingPulse(state.getStatus())) {
@@ -347,7 +345,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
   static IJobUpdateQuery queryActiveByJob(IJobKey job) {
     return IJobUpdateQuery.build(new JobUpdateQuery()
         .setJobKey(job.newBuilder())
-        .setUpdateStatuses(ACTIVE_JOB_UPDATE_STATES));
+        .setUpdateStatuses(Updates.ACTIVE_JOB_UPDATE_STATES));
   }
 
   @VisibleForTesting
@@ -402,22 +400,16 @@ class JobUpdateControllerImpl implements JobUpdateController {
     }
 
     assertTransitionAllowed(updateSummary.getState().getStatus(), newStatus);
-    recordAndChangeJobUpdateStatus(
-        storeProvider,
-        updateSummary.getUpdateId(),
-        updateSummary.getJobKey(),
-        newStatus,
-        user);
+    recordAndChangeJobUpdateStatus(storeProvider, Updates.getKey(updateSummary), newStatus, user);
   }
 
   private void recordAndChangeJobUpdateStatus(
       MutableStoreProvider storeProvider,
-      String updateId,
-      IJobKey job,
+      IJobUpdateKey key,
       JobUpdateStatus status,
       Optional<String> user) {
 
-    changeJobUpdateStatus(storeProvider, updateId, job, status, user, true);
+    changeJobUpdateStatus(storeProvider, key, status, user, true);
   }
 
   private static final Set<JobUpdateStatus> TERMINAL_STATES = ImmutableSet.of(
@@ -430,8 +422,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
 
   private void changeJobUpdateStatus(
       MutableStoreProvider storeProvider,
-      String updateId,
-      IJobKey job,
+      IJobUpdateKey key,
       JobUpdateStatus newStatus,
       Optional<String> user,
       boolean recordChange) {
@@ -440,40 +431,40 @@ class JobUpdateControllerImpl implements JobUpdateController {
     boolean record;
 
     JobUpdateStore.Mutable updateStore = storeProvider.getJobUpdateStore();
-    Optional<String> updateLock = updateStore.getLockToken(updateId);
+    Optional<String> updateLock = updateStore.getLockToken(key.getId());
     if (updateLock.isPresent()) {
       status = newStatus;
       record = recordChange;
     } else {
-      LOG.severe("Update " + updateId + " does not have a lock");
+      LOG.severe("Update " + key + " does not have a lock");
       status = ERROR;
       record = true;
     }
 
-    LOG.info(String.format(
-        "Update %s %s is now in state %s", JobKeys.canonicalString(job), updateId, status));
+    LOG.info(String.format("Update %s is now in state %s", key, status));
     if (record) {
       updateStore.saveJobUpdateEvent(
+          key,
           IJobUpdateEvent.build(new JobUpdateEvent()
               .setStatus(status)
               .setUser(user.orNull())
-              .setTimestampMs(clock.nowMillis())),
-          updateId);
+              .setTimestampMs(clock.nowMillis())));
     }
 
     if (TERMINAL_STATES.contains(status)) {
       if (updateLock.isPresent()) {
         lockManager.releaseLock(ILock.build(new Lock()
-            .setKey(LockKey.job(job.newBuilder()))
+            .setKey(LockKey.job(key.getJob().newBuilder()))
             .setToken(updateLock.get())));
       }
 
-      pulseHandler.remove(updateId);
+      pulseHandler.remove(key);
     } else {
-      pulseHandler.updatePulseStatus(updateId, status);
+      pulseHandler.updatePulseStatus(key, status);
     }
 
     MonitorAction action = JobUpdateStateMachine.getActionForStatus(status);
+    IJobKey job = key.getJob();
     if (action == STOP_WATCHING) {
       updates.remove(job);
     } else if (action == ROLL_FORWARD || action == ROLL_BACK) {
@@ -483,13 +474,13 @@ class JobUpdateControllerImpl implements JobUpdateController {
         checkState(!updates.containsKey(job), "Updater already exists for " + job);
       }
 
-      IJobUpdate jobUpdate = updateStore.fetchJobUpdate(updateId).get();
+      IJobUpdate jobUpdate = updateStore.fetchJobUpdate(key.getId()).get();
       UpdateFactory.Update update;
       try {
         update = updateFactory.newUpdate(jobUpdate.getInstructions(), action == ROLL_FORWARD);
       } catch (RuntimeException e) {
         LOG.log(Level.WARNING, "Uncaught exception: " + e, e);
-        changeJobUpdateStatus(storeProvider, updateId, job, ERROR, user, true);
+        changeJobUpdateStatus(storeProvider, key, ERROR, user, true);
         return;
       }
       updates.put(job, update);
@@ -518,13 +509,13 @@ class JobUpdateControllerImpl implements JobUpdateController {
   }
 
   private boolean isCoordinatedAndPulseExpired(
-      String updateId,
+      IJobUpdateKey key,
       IJobUpdateInstructions instructions) {
 
     if (isCoordinatedUpdate(instructions)) {
-      PulseState pulseState = pulseHandler.get(updateId);
+      PulseState pulseState = pulseHandler.get(key);
       boolean result = pulseState == null || pulseState.isBlocked(clock);
-      LOG.info(String.format("Coordinated update %s pulse expired: %s", updateId, result));
+      LOG.info(String.format("Coordinated update %s pulse expired: %s", key, result));
       return result;
     } else {
       return false;
@@ -538,18 +529,18 @@ class JobUpdateControllerImpl implements JobUpdateController {
       Map<Integer, Optional<IScheduledTask>> changedInstance) {
 
     JobUpdateStatus updaterStatus = summary.getState().getStatus();
-    final IJobKey job = summary.getJobKey();
+    final IJobUpdateKey key = Updates.getKey(summary);
 
-    final JobUpdateStore.Mutable updateStore = storeProvider.getJobUpdateStore();
+    JobUpdateStore.Mutable updateStore = storeProvider.getJobUpdateStore();
     if (!updateStore.getLockToken(summary.getUpdateId()).isPresent()) {
-      recordAndChangeJobUpdateStatus(storeProvider, summary.getUpdateId(), job, ERROR, NO_USER);
+      recordAndChangeJobUpdateStatus(storeProvider, key, ERROR, NO_USER);
       return;
     }
 
     IJobUpdateInstructions instructions =
         updateStore.fetchJobUpdateInstructions(summary.getUpdateId()).get();
 
-    if (isCoordinatedAndPulseExpired(summary.getUpdateId(), instructions)) {
+    if (isCoordinatedAndPulseExpired(key, instructions)) {
       // Move coordinated update into awaiting pulse state.
       JobUpdateStatus blockedStatus = getBlockedState(summary.getState().getStatus());
       changeUpdateStatus(storeProvider, summary, blockedStatus, Optional.of(INTERNAL_USER));
@@ -560,13 +551,13 @@ class JobUpdateControllerImpl implements JobUpdateController {
         new InstanceStateProvider<Integer, Optional<IScheduledTask>>() {
           @Override
           public Optional<IScheduledTask> getState(Integer instanceId) {
-            return getActiveInstance(storeProvider.getTaskStore(), job, instanceId);
+            return getActiveInstance(storeProvider.getTaskStore(), key.getJob(), instanceId);
           }
         };
 
     EvaluationResult<Integer> result = update.getUpdater().evaluate(changedInstance, stateProvider);
 
-    LOG.info(JobKeys.canonicalString(job) + " evaluation result: " + result);
+    LOG.info(key + " evaluation result: " + result);
 
     for (Map.Entry<Integer, SideEffect> entry : result.getSideEffects().entrySet()) {
       Iterable<InstanceUpdateStatus> statusChanges;
@@ -597,7 +588,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
                 .setInstanceId(entry.getKey())
                 .setTimestampMs(clock.nowMillis())
                 .setAction(action));
-        updateStore.saveJobInstanceUpdateEvent(event, summary.getUpdateId());
+        updateStore.saveJobInstanceUpdateEvent(Updates.getKey(summary), event);
       }
     }
 
@@ -614,9 +605,9 @@ class JobUpdateControllerImpl implements JobUpdateController {
         changeUpdateStatus(storeProvider, summary, update.getFailureStatus(), NO_USER);
       }
     } else {
-      LOG.info("Executing side-effects for update of " + job + ": " + result.getSideEffects());
+      LOG.info("Executing side-effects for update of " + key + ": " + result.getSideEffects());
       for (Map.Entry<Integer, SideEffect> entry : result.getSideEffects().entrySet()) {
-        IInstanceKey instance = InstanceKeys.from(job, entry.getKey());
+        IInstanceKey instance = InstanceKeys.from(key.getJob(), entry.getKey());
 
         Optional<InstanceAction> action = entry.getValue().getAction();
         if (action.isPresent()) {
@@ -700,24 +691,24 @@ class JobUpdateControllerImpl implements JobUpdateController {
     // Currently active coordinated update pulse states. A pulse state is added when a coordinated
     // update is created and removed only when an update reaches terminal state. A PAUSED update
     // pulse state is still retained in the map and accepts pulses.
-    private final Map<String, PulseState> pulseMap = Maps.newHashMap();
+    private final Map<IJobUpdateKey, PulseState> pulseStates = Maps.newHashMap();
 
     PulseHandler(Clock clock) {
       this.clock = requireNonNull(clock);
     }
 
     synchronized void initializePulseState(IJobUpdate update, JobUpdateStatus status) {
-      pulseMap.put(update.getSummary().getUpdateId(), new PulseState(
+      pulseStates.put(Updates.getKey(update.getSummary()), new PulseState(
           update.getSummary().getJobKey(),
           status,
           update.getInstructions().getSettings().getBlockIfNoPulsesAfterMs(),
           0L));
     }
 
-    synchronized PulseState pulseAndGet(String updateId) {
-      PulseState state = pulseMap.get(updateId);
+    synchronized PulseState pulseAndGet(IJobUpdateKey key) {
+      PulseState state = pulseStates.get(key);
       if (state != null) {
-        state = pulseMap.put(updateId, new PulseState(
+        state = pulseStates.put(key, new PulseState(
             state.getJobKey(),
             state.getStatus(),
             state.getPulseTimeoutMs(),
@@ -726,10 +717,10 @@ class JobUpdateControllerImpl implements JobUpdateController {
       return state;
     }
 
-    synchronized void updatePulseStatus(String updateId, JobUpdateStatus status) {
-      PulseState state = pulseMap.get(updateId);
+    synchronized void updatePulseStatus(IJobUpdateKey key, JobUpdateStatus status) {
+      PulseState state = pulseStates.get(key);
       if (state != null) {
-        pulseMap.put(updateId, new PulseState(
+        pulseStates.put(key, new PulseState(
             state.getJobKey(),
             status,
             state.getPulseTimeoutMs(),
@@ -737,12 +728,12 @@ class JobUpdateControllerImpl implements JobUpdateController {
       }
     }
 
-    synchronized void remove(String updateId) {
-      pulseMap.remove(updateId);
+    synchronized void remove(IJobUpdateKey key) {
+      pulseStates.remove(key);
     }
 
-    synchronized PulseState get(String updateId) {
-      return pulseMap.get(updateId);
+    synchronized PulseState get(IJobUpdateKey key) {
+      return pulseStates.get(key);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ace02d13/src/main/java/org/apache/aurora/scheduler/updater/Updates.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/Updates.java b/src/main/java/org/apache/aurora/scheduler/updater/Updates.java
new file mode 100644
index 0000000..6493d68
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/updater/Updates.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+
+import org.apache.aurora.gen.JobUpdateKey;
+import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.apiConstants;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Utility functions for job updates.
+ */
+public final class Updates {
+  private Updates() {
+    // Utility class.
+  }
+
+  /**
+   * Different states that an active job update may be in.
+   */
+  public static final Set<JobUpdateStatus> ACTIVE_JOB_UPDATE_STATES =
+      Sets.immutableEnumSet(apiConstants.ACTIVE_JOB_UPDATE_STATES);
+
+  public static IJobUpdateKey getKey(IJobUpdateSummary summary) {
+    return IJobUpdateKey.build(
+        new JobUpdateKey(
+            JobKeys.assertValid(summary.getJobKey()).newBuilder(),
+            requireNonNull(summary.getUpdateId())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ace02d13/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java
index 1cd693a..02e8798 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java
@@ -21,7 +21,10 @@ import com.twitter.common.quantity.Time;
 import com.twitter.common.testing.easymock.EasyMockTest;
 import com.twitter.common.util.Clock;
 
+import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.gen.JobUpdateKey;
 import org.apache.aurora.scheduler.async.JobUpdateHistoryPruner.HistoryPrunerSettings;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
 import org.junit.Test;
@@ -41,8 +44,12 @@ public class JobUpdateHistoryPrunerTest extends EasyMockTest {
     Clock mockClock = createMock(Clock.class);
     expect(mockClock.nowMillis()).andReturn(2L).times(2);
 
-    expect(storageUtil.jobUpdateStore.pruneHistory(1, 1)).andReturn(ImmutableSet.of("id1", "id2"));
-    expect(storageUtil.jobUpdateStore.pruneHistory(1, 1)).andReturn(ImmutableSet.<String>of());
+    expect(storageUtil.jobUpdateStore.pruneHistory(1, 1))
+        .andReturn(ImmutableSet.of(
+            IJobUpdateKey.build(
+                new JobUpdateKey().setJob(new JobKey("role", "env", "job")).setId("id1"))));
+    expect(storageUtil.jobUpdateStore.pruneHistory(1, 1))
+        .andReturn(ImmutableSet.<IJobUpdateKey>of());
 
     control.replay();
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ace02d13/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
index 156cbc4..f4c92e6 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
@@ -53,9 +53,11 @@ import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
 import org.apache.aurora.scheduler.storage.entities.ILock;
+import org.apache.aurora.scheduler.updater.Updates;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -97,11 +99,11 @@ public class DBJobUpdateStoreTest {
 
   @Test
   public void testSaveJobUpdates() {
-    String updateId1 = "u1";
-    String updateId2 = "u2";
+    IJobUpdateKey updateId1 = makeKey(JobKeys.from("role", "env", "name1"), "u1");
+    IJobUpdateKey updateId2 = makeKey(JobKeys.from("role", "env", "name2"), "u2");
 
-    IJobUpdate update1 = makeJobUpdate(JobKeys.from("role", "env", "name1"), updateId1);
-    IJobUpdate update2 = makeJobUpdate(JobKeys.from("role", "env", "name2"), updateId2);
+    IJobUpdate update1 = makeJobUpdate(updateId1);
+    IJobUpdate update2 = makeJobUpdate(updateId2);
 
     assertEquals(Optional.<IJobUpdate>absent(), getUpdate(updateId1));
     assertEquals(Optional.<IJobUpdate>absent(), getUpdate(updateId2));
@@ -113,7 +115,8 @@ public class DBJobUpdateStoreTest {
     assertUpdate(update2);
 
     // Colliding update IDs should be forbidden.
-    IJobUpdate update3 = makeJobUpdate(JobKeys.from("role", "env", "name3"), updateId2);
+    IJobUpdate update3 =
+        makeJobUpdate(makeKey(JobKeys.from("role", "env", "name3"), updateId2.getId()));
     try {
       saveUpdate(update3, Optional.<String>absent());
       fail("Update ID collision should not be allowed");
@@ -124,7 +127,7 @@ public class DBJobUpdateStoreTest {
 
   @Test
   public void testSaveNullInitialState() {
-    JobUpdate builder = makeJobUpdate(JOB, "u1").newBuilder();
+    JobUpdate builder = makeJobUpdate(makeKey("u1")).newBuilder();
     builder.getInstructions().unsetInitialState();
 
     // Save with null initial state instances.
@@ -136,7 +139,7 @@ public class DBJobUpdateStoreTest {
 
   @Test
   public void testSaveNullDesiredState() {
-    JobUpdate builder = makeJobUpdate(JOB, "u1").newBuilder();
+    JobUpdate builder = makeJobUpdate(makeKey("u1")).newBuilder();
     builder.getInstructions().unsetDesiredState();
 
     // Save with null desired state instances.
@@ -147,7 +150,7 @@ public class DBJobUpdateStoreTest {
 
   @Test(expected = IllegalArgumentException.class)
   public void testSaveBothInitialAndDesiredMissingThrows() {
-    JobUpdate builder = makeJobUpdate(JOB, "u1").newBuilder();
+    JobUpdate builder = makeJobUpdate(makeKey("u1")).newBuilder();
     builder.getInstructions().unsetInitialState();
     builder.getInstructions().unsetDesiredState();
 
@@ -156,7 +159,7 @@ public class DBJobUpdateStoreTest {
 
   @Test(expected = NullPointerException.class)
   public void testSaveNullInitialStateTaskThrows() {
-    JobUpdate builder = makeJobUpdate(JOB, "u1").newBuilder();
+    JobUpdate builder = makeJobUpdate(makeKey("u1")).newBuilder();
     builder.getInstructions().getInitialState().add(
         new InstanceTaskConfig(null, ImmutableSet.<Range>of()));
 
@@ -165,7 +168,7 @@ public class DBJobUpdateStoreTest {
 
   @Test(expected = IllegalArgumentException.class)
   public void testSaveEmptyInitialStateRangesThrows() {
-    JobUpdate builder = makeJobUpdate(JOB, "u1").newBuilder();
+    JobUpdate builder = makeJobUpdate(makeKey("u1")).newBuilder();
     builder.getInstructions().getInitialState().add(
         new InstanceTaskConfig(new TaskConfig(), ImmutableSet.<Range>of()));
 
@@ -174,7 +177,7 @@ public class DBJobUpdateStoreTest {
 
   @Test(expected = NullPointerException.class)
   public void testSaveNullDesiredStateTaskThrows() {
-    JobUpdate builder = makeJobUpdate(JOB, "u1").newBuilder();
+    JobUpdate builder = makeJobUpdate(makeKey("u1")).newBuilder();
     builder.getInstructions().getDesiredState().setTask(null);
 
     saveUpdate(IJobUpdate.build(builder), Optional.of("lock"));
@@ -182,7 +185,7 @@ public class DBJobUpdateStoreTest {
 
   @Test(expected = IllegalArgumentException.class)
   public void testSaveEmptyDesiredStateRangesThrows() {
-    JobUpdate builder = makeJobUpdate(JOB, "u1").newBuilder();
+    JobUpdate builder = makeJobUpdate(makeKey("u1")).newBuilder();
     builder.getInstructions().getDesiredState().setInstances(ImmutableSet.<Range>of());
 
     saveUpdate(IJobUpdate.build(builder), Optional.of("lock"));
@@ -190,9 +193,9 @@ public class DBJobUpdateStoreTest {
 
   @Test
   public void testSaveJobUpdateEmptyInstanceOverrides() {
-    String updateId = "u1";
+    IJobUpdateKey updateId = makeKey("u1");
 
-    IJobUpdate update = makeJobUpdate(JOB, updateId);
+    IJobUpdate update = makeJobUpdate(updateId);
     JobUpdate builder = update.newBuilder();
     builder.getInstructions().getSettings().setUpdateOnlyTheseInstances(ImmutableSet.<Range>of());
 
@@ -205,9 +208,9 @@ public class DBJobUpdateStoreTest {
 
   @Test
   public void testSaveJobUpdateNullInstanceOverrides() {
-    String updateId = "u1";
+    IJobUpdateKey updateId = makeKey("u1");
 
-    IJobUpdate update = makeJobUpdate(JOB, updateId);
+    IJobUpdate update = makeJobUpdate(updateId);
     JobUpdate builder = update.newBuilder();
     builder.getInstructions().getSettings().setUpdateOnlyTheseInstances(ImmutableSet.<Range>of());
 
@@ -221,8 +224,8 @@ public class DBJobUpdateStoreTest {
 
   @Test(expected = StorageException.class)
   public void testSaveJobUpdateTwiceThrows() {
-    String updateId = "u1";
-    IJobUpdate update = makeJobUpdate(JOB, updateId);
+    IJobUpdateKey updateId = makeKey("u1");
+    IJobUpdate update = makeJobUpdate(updateId);
 
     saveUpdate(update, Optional.of("lock1"));
     saveUpdate(update, Optional.of("lock2"));
@@ -230,9 +233,9 @@ public class DBJobUpdateStoreTest {
 
   @Test
   public void testSaveJobEvents() {
-    String updateId = "u3";
+    IJobUpdateKey updateId = makeKey("u3");
     String user = "test";
-    IJobUpdate update = makeJobUpdate(JOB, updateId);
+    IJobUpdate update = makeJobUpdate(updateId);
     IJobUpdateEvent event1 = makeJobUpdateEvent(ROLLING_FORWARD, 124L, user);
     IJobUpdateEvent event2 = makeJobUpdateEvent(ROLL_FORWARD_PAUSED, 125L, user);
 
@@ -256,8 +259,8 @@ public class DBJobUpdateStoreTest {
 
   @Test
   public void testSaveInstanceEvents() {
-    String updateId = "u3";
-    IJobUpdate update = makeJobUpdate(JOB, updateId);
+    IJobUpdateKey updateId = makeKey("u3");
+    IJobUpdate update = makeJobUpdate(updateId);
     IJobInstanceUpdateEvent event1 = makeJobInstanceEvent(0, 125L, INSTANCE_UPDATED);
     IJobInstanceUpdateEvent event2 = makeJobInstanceEvent(1, 126L, INSTANCE_ROLLING_BACK);
 
@@ -283,18 +286,18 @@ public class DBJobUpdateStoreTest {
 
   @Test(expected = StorageException.class)
   public void testSaveJobEventWithoutUpdateFails() {
-    saveJobEvent(makeJobUpdateEvent(ROLLING_FORWARD, 123L), "u2");
+    saveJobEvent(makeJobUpdateEvent(ROLLING_FORWARD, 123L), makeKey("u2"));
   }
 
   @Test(expected = StorageException.class)
   public void testSaveInstanceEventWithoutUpdateFails() {
-    saveJobInstanceEvent(makeJobInstanceEvent(0, 125L, INSTANCE_UPDATED), "u1");
+    saveJobInstanceEvent(makeJobInstanceEvent(0, 125L, INSTANCE_UPDATED), makeKey("u1"));
   }
 
   @Test
   public void testSaveJobUpdateStateIgnored() {
-    String updateId = "u1";
-    IJobUpdate update = populateExpected(makeJobUpdate(JOB, updateId), ABORTED, 567L, 567L);
+    IJobUpdateKey updateId = makeKey("u1");
+    IJobUpdate update = populateExpected(makeJobUpdate(updateId), ABORTED, 567L, 567L);
     saveUpdate(update, Optional.of("lock1"));
 
     // Assert state fields were ignored.
@@ -303,11 +306,11 @@ public class DBJobUpdateStoreTest {
 
   @Test
   public void testSaveJobUpdateWithoutEventFailsSelect() {
-    final String updateId = "u3";
+    final IJobUpdateKey updateId = makeKey("u3");
     storage.write(new MutateWork.NoResult.Quiet() {
       @Override
       public void execute(MutableStoreProvider storeProvider) {
-        IJobUpdate update = makeJobUpdate(JOB, updateId);
+        IJobUpdate update = makeJobUpdate(updateId);
         storeProvider.getLockStore().saveLock(makeLock(update.getSummary().getJobKey(), "lock1"));
         storeProvider.getJobUpdateStore().saveJobUpdate(update, Optional.of("lock1"));
       }
@@ -317,12 +320,10 @@ public class DBJobUpdateStoreTest {
 
   @Test
   public void testMultipleJobDetails() {
-    String updateId1 = "u1";
-    String updateId2 = "u2";
-    IJobUpdateDetails details1 =
-        makeJobDetails(makeJobUpdate(JobKeys.from("role", "env", "name1"), updateId1));
-    IJobUpdateDetails details2 =
-        makeJobDetails(makeJobUpdate(JobKeys.from("role", "env", "name2"), updateId2));
+    IJobUpdateKey updateId1 = makeKey(JobKeys.from("role", "env", "name1"), "u1");
+    IJobUpdateKey updateId2 = makeKey(JobKeys.from("role", "env", "name2"), "u2");
+    IJobUpdateDetails details1 = makeJobDetails(makeJobUpdate(updateId1));
+    IJobUpdateDetails details2 = makeJobDetails(makeJobUpdate(updateId2));
 
     assertEquals(ImmutableList.<IJobInstanceUpdateEvent>of(), getInstanceEvents(updateId2, 3));
 
@@ -382,8 +383,8 @@ public class DBJobUpdateStoreTest {
 
   @Test
   public void testTruncateJobUpdates() {
-    String updateId = "u5";
-    IJobUpdate update = makeJobUpdate(JOB, updateId);
+    IJobUpdateKey updateId = makeKey("u5");
+    IJobUpdate update = makeJobUpdate(updateId);
     IJobUpdateEvent updateEvent = IJobUpdateEvent.build(new JobUpdateEvent(ROLLING_FORWARD, 123L));
     IJobInstanceUpdateEvent instanceEvent = IJobInstanceUpdateEvent.build(
         new JobInstanceUpdateEvent(0, 125L, INSTANCE_ROLLBACK_FAILED));
@@ -403,22 +404,22 @@ public class DBJobUpdateStoreTest {
 
   @Test
   public void testPruneHistory() {
-    String updateId1 = "u11";
-    String updateId2 = "u12";
-    String updateId3 = "u13";
-    String updateId4 = "u14";
-    String updateId5 = "u15";
-    String updateId6 = "u16";
-    String updateId7 = "u17";
-
+    IJobUpdateKey updateId1 = makeKey("u11");
+    IJobUpdateKey updateId2 = makeKey("u12");
+    IJobUpdateKey updateId3 = makeKey("u13");
+    IJobUpdateKey updateId4 = makeKey("u14");
     IJobKey job2 = JobKeys.from("testRole2", "testEnv2", "job2");
-    IJobUpdate update1 = makeJobUpdate(JOB, updateId1);
-    IJobUpdate update2 = makeJobUpdate(JOB, updateId2);
-    IJobUpdate update3 = makeJobUpdate(JOB, updateId3);
-    IJobUpdate update4 = makeJobUpdate(JOB, updateId4);
-    IJobUpdate update5 = makeJobUpdate(job2, updateId5);
-    IJobUpdate update6 = makeJobUpdate(job2, updateId6);
-    IJobUpdate update7 = makeJobUpdate(job2, updateId7);
+    IJobUpdateKey updateId5 = makeKey(job2, "u15");
+    IJobUpdateKey updateId6 = makeKey(job2, "u16");
+    IJobUpdateKey updateId7 = makeKey(job2, "u17");
+
+    IJobUpdate update1 = makeJobUpdate(updateId1);
+    IJobUpdate update2 = makeJobUpdate(updateId2);
+    IJobUpdate update3 = makeJobUpdate(updateId3);
+    IJobUpdate update4 = makeJobUpdate(updateId4);
+    IJobUpdate update5 = makeJobUpdate(updateId5);
+    IJobUpdate update6 = makeJobUpdate(updateId6);
+    IJobUpdate update7 = makeJobUpdate(updateId7);
 
     IJobUpdateEvent updateEvent1 = IJobUpdateEvent.build(new JobUpdateEvent(ROLLING_BACK, 123L));
     IJobUpdateEvent updateEvent2 = IJobUpdateEvent.build(new JobUpdateEvent(ABORTED, 124L));
@@ -462,7 +463,7 @@ public class DBJobUpdateStoreTest {
     long pruningThreshold = 120L;
 
     // No updates pruned.
-    assertEquals(ImmutableSet.<String>of(), pruneHistory(3, pruningThreshold));
+    assertEquals(ImmutableSet.<IJobUpdateKey>of(), pruneHistory(3, pruningThreshold));
     assertEquals(Optional.of(update7), getUpdate(updateId7)); // active update
     assertEquals(Optional.of(update6), getUpdate(updateId6));
     assertEquals(Optional.of(update5), getUpdate(updateId5));
@@ -513,7 +514,7 @@ public class DBJobUpdateStoreTest {
 
   @Test(expected = StorageException.class)
   public void testSaveUpdateWithoutLock() {
-    final IJobUpdate update = makeJobUpdate(JOB, "updateId");
+    final IJobUpdate update = makeJobUpdate(makeKey("updateId"));
     storage.write(new MutateWork.NoResult.Quiet() {
       @Override
       public void execute(MutableStoreProvider storeProvider) {
@@ -524,29 +525,29 @@ public class DBJobUpdateStoreTest {
 
   @Test(expected = StorageException.class)
   public void testSaveTwoUpdatesForOneJob() {
-    final IJobUpdate update = makeJobUpdate(JOB, "updateId");
+    final IJobUpdate update = makeJobUpdate(makeKey("updateId"));
     saveUpdate(update, Optional.of("lock1"));
     saveUpdate(update, Optional.of("lock2"));
   }
 
   @Test(expected = StorageException.class)
   public void testSaveTwoUpdatesSameJobKey() {
-    final IJobUpdate update1 = makeJobUpdate(JOB, "update1");
-    final IJobUpdate update2 = makeJobUpdate(JOB, "update2");
+    final IJobUpdate update1 = makeJobUpdate(makeKey("update1"));
+    final IJobUpdate update2 = makeJobUpdate(makeKey("update2"));
     saveUpdate(update1, Optional.of("lock1"));
     saveUpdate(update2, Optional.of("lock1"));
   }
 
   @Test
   public void testLockCleared() {
-    final IJobUpdate update = makeJobUpdate(JOB, "update1");
+    final IJobUpdate update = makeJobUpdate(makeKey("update1"));
     saveUpdate(update, Optional.of("lock1"));
 
     removeLock(update, "lock1");
 
     assertEquals(
         Optional.of(updateJobDetails(populateExpected(update), FIRST_EVENT)),
-        getUpdateDetails("update1"));
+        getUpdateDetails(makeKey("update1")));
     assertEquals(
         ImmutableSet.of(
             new StoredJobUpdateDetails(
@@ -559,7 +560,7 @@ public class DBJobUpdateStoreTest {
         getSummaries(new JobUpdateQuery().setUpdateId("update1")));
 
     // If the lock has been released for this job, we can start another update.
-    saveUpdate(makeJobUpdate(JOB, "update2"), Optional.of("lock2"));
+    saveUpdate(makeJobUpdate(makeKey("update2")), Optional.of("lock2"));
   }
 
   private static final Optional<String> NO_TOKEN = Optional.absent();
@@ -569,8 +570,10 @@ public class DBJobUpdateStoreTest {
     storage.write(new MutateWork.NoResult.Quiet() {
       @Override
       public void execute(MutableStoreProvider storeProvider) {
-        final IJobUpdate update1 = makeJobUpdate(JobKeys.from("role", "env", "name1"), "update1");
-        final IJobUpdate update2 = makeJobUpdate(JobKeys.from("role", "env", "name2"), "update2");
+        final IJobUpdate update1 =
+            makeJobUpdate(makeKey(JobKeys.from("role", "env", "name1"), "update1"));
+        final IJobUpdate update2 =
+            makeJobUpdate(makeKey(JobKeys.from("role", "env", "name2"), "update2"));
         saveUpdate(update1, Optional.of("lock1"));
         assertEquals(
             Optional.of("lock1"),
@@ -609,13 +612,15 @@ public class DBJobUpdateStoreTest {
     IJobKey job4 = JobKeys.from(role1, "env", "name4");
     IJobKey job5 = JobKeys.from("role", "env", "name5");
     IJobUpdateSummary s1 =
-        saveSummary(job1, "u1", 1230L, ROLLED_BACK, "user", Optional.of("lock1"));
-    IJobUpdateSummary s2 =  saveSummary(job2, "u2", 1231L, ABORTED, "user", Optional.of("lock2"));
-    IJobUpdateSummary s3 = saveSummary(job3, "u3", 1239L, ERROR, "user2", Optional.of("lock3"));
+        saveSummary(makeKey(job1, "u1"), 1230L, ROLLED_BACK, "user", Optional.of("lock1"));
+    IJobUpdateSummary s2 =
+        saveSummary(makeKey(job2, "u2"), 1231L, ABORTED, "user", Optional.of("lock2"));
+    IJobUpdateSummary s3 =
+        saveSummary(makeKey(job3, "u3"), 1239L, ERROR, "user2", Optional.of("lock3"));
     IJobUpdateSummary s4 =
-        saveSummary(job4, "u4", 1234L, ROLL_BACK_PAUSED, "user3", Optional.of("lock4"));
+        saveSummary(makeKey(job4, "u4"), 1234L, ROLL_BACK_PAUSED, "user3", Optional.of("lock4"));
     IJobUpdateSummary s5 =
-        saveSummary(job5, "u5", 1235L, ROLLING_FORWARD, "user4", Optional.of("lock5"));
+        saveSummary(makeKey(job5, "u5"), 1235L, ROLLING_FORWARD, "user4", Optional.of("lock5"));
 
     // Test empty query returns all.
     assertEquals(ImmutableList.of(s3, s5, s4, s2, s1), getSummaries(new JobUpdateQuery()));
@@ -684,13 +689,13 @@ public class DBJobUpdateStoreTest {
 
   @Test
   public void testQueryDetails() {
-    String updateId1 = "u1";
-    String updateId2 = "u2";
     IJobKey jobKey1 = JobKeys.from("role1", "env", "name1");
+    IJobUpdateKey updateId1 = makeKey(jobKey1, "u1");
     IJobKey jobKey2 = JobKeys.from("role2", "env", "name2");
+    IJobUpdateKey updateId2 = makeKey(jobKey2, "u2");
 
-    IJobUpdate update1 = makeJobUpdate(jobKey1, updateId1);
-    IJobUpdate update2 = makeJobUpdate(jobKey2, updateId2);
+    IJobUpdate update1 = makeJobUpdate(updateId1);
+    IJobUpdate update2 = makeJobUpdate(updateId2);
 
     assertEquals(ImmutableList.<IJobInstanceUpdateEvent>of(), getInstanceEvents(updateId2, 3));
 
@@ -730,7 +735,7 @@ public class DBJobUpdateStoreTest {
     // Test query by update ID.
     assertEquals(
         ImmutableList.of(details1),
-        queryDetails(new JobUpdateQuery().setUpdateId(updateId1)));
+        queryDetails(new JobUpdateQuery().setUpdateId(updateId1.getId())));
 
     // Test query by role.
     assertEquals(
@@ -753,45 +758,53 @@ public class DBJobUpdateStoreTest {
         queryDetails(new JobUpdateQuery().setRole("no match")));
   }
 
+  private static IJobUpdateKey makeKey(String id) {
+    return makeKey(JOB, id);
+  }
+
+  private static IJobUpdateKey makeKey(IJobKey job, String id) {
+    return IJobUpdateKey.build(new JobUpdateKey(job.newBuilder(), id));
+  }
+
   private void assertUpdate(IJobUpdate expected) {
-    String updateId = expected.getSummary().getUpdateId();
-    assertEquals(populateExpected(expected), getUpdate(updateId).get());
-    assertEquals(getUpdate(updateId).get(), getUpdateDetails(updateId).get().getUpdate());
-    assertEquals(getUpdateInstructions(updateId).get(), expected.getInstructions());
+    IJobUpdateKey key = Updates.getKey(expected.getSummary());
+    assertEquals(populateExpected(expected), getUpdate(key).get());
+    assertEquals(getUpdate(key).get(), getUpdateDetails(key).get().getUpdate());
+    assertEquals(getUpdateInstructions(key).get(), expected.getInstructions());
   }
 
-  private Optional<IJobUpdate> getUpdate(final String updateId) {
+  private Optional<IJobUpdate> getUpdate(final IJobUpdateKey key) {
     return storage.read(new Quiet<Optional<IJobUpdate>>() {
       @Override
       public Optional<IJobUpdate> apply(Storage.StoreProvider storeProvider) {
-        return storeProvider.getJobUpdateStore().fetchJobUpdate(updateId);
+        return storeProvider.getJobUpdateStore().fetchJobUpdate(key.getId());
       }
     });
   }
 
-  private List<IJobInstanceUpdateEvent> getInstanceEvents(final String updateId, final int id) {
+  private List<IJobInstanceUpdateEvent> getInstanceEvents(final IJobUpdateKey key, final int id) {
     return storage.read(new Quiet<List<IJobInstanceUpdateEvent>>() {
       @Override
       public List<IJobInstanceUpdateEvent> apply(Storage.StoreProvider storeProvider) {
-        return storeProvider.getJobUpdateStore().fetchInstanceEvents(updateId, id);
+        return storeProvider.getJobUpdateStore().fetchInstanceEvents(key.getId(), id);
       }
     });
   }
 
-  private Optional<IJobUpdateInstructions> getUpdateInstructions(final String updateId) {
+  private Optional<IJobUpdateInstructions> getUpdateInstructions(final IJobUpdateKey key) {
     return storage.read(new Quiet<Optional<IJobUpdateInstructions>>() {
       @Override
       public Optional<IJobUpdateInstructions> apply(Storage.StoreProvider storeProvider) {
-        return storeProvider.getJobUpdateStore().fetchJobUpdateInstructions(updateId);
+        return storeProvider.getJobUpdateStore().fetchJobUpdateInstructions(key.getId());
       }
     });
   }
 
-  private Optional<IJobUpdateDetails> getUpdateDetails(final String updateId) {
+  private Optional<IJobUpdateDetails> getUpdateDetails(final IJobUpdateKey key) {
     return storage.read(new Quiet<Optional<IJobUpdateDetails>>() {
       @Override
       public Optional<IJobUpdateDetails> apply(Storage.StoreProvider storeProvider) {
-        return storeProvider.getJobUpdateStore().fetchJobUpdateDetails(updateId);
+        return storeProvider.getJobUpdateStore().fetchJobUpdateDetails(key.getId());
       }
     });
   }
@@ -843,8 +856,8 @@ public class DBJobUpdateStoreTest {
         }
         storeProvider.getJobUpdateStore().saveJobUpdate(update, lockToken);
         storeProvider.getJobUpdateStore().saveJobUpdateEvent(
-            FIRST_EVENT,
-            update.getSummary().getUpdateId());
+            Updates.getKey(update.getSummary()),
+            FIRST_EVENT);
       }
     });
 
@@ -866,20 +879,20 @@ public class DBJobUpdateStoreTest {
     return update;
   }
 
-  private void saveJobEvent(final IJobUpdateEvent event, final String updateId) {
+  private void saveJobEvent(final IJobUpdateEvent event, final IJobUpdateKey key) {
     storage.write(new MutateWork.NoResult.Quiet() {
       @Override
       public void execute(MutableStoreProvider storeProvider) {
-        storeProvider.getJobUpdateStore().saveJobUpdateEvent(event, updateId);
+        storeProvider.getJobUpdateStore().saveJobUpdateEvent(key, event);
       }
     });
   }
 
-  private void saveJobInstanceEvent(final IJobInstanceUpdateEvent event, final String updateId) {
+  private void saveJobInstanceEvent(final IJobInstanceUpdateEvent event, final IJobUpdateKey key) {
     storage.write(new MutateWork.NoResult.Quiet() {
       @Override
       public void execute(MutableStoreProvider storeProvider) {
-        storeProvider.getJobUpdateStore().saveJobInstanceUpdateEvent(event, updateId);
+        storeProvider.getJobUpdateStore().saveJobInstanceUpdateEvent(key, event);
       }
     });
   }
@@ -893,10 +906,10 @@ public class DBJobUpdateStoreTest {
     });
   }
 
-  private Set<String> pruneHistory(final int retainCount, final long pruningThresholdMs) {
-    return storage.write(new MutateWork.Quiet<Set<String>>() {
+  private Set<IJobUpdateKey> pruneHistory(final int retainCount, final long pruningThresholdMs) {
+    return storage.write(new MutateWork.Quiet<Set<IJobUpdateKey>>() {
       @Override
-      public Set<String> apply(MutableStoreProvider storeProvider) {
+      public Set<IJobUpdateKey> apply(MutableStoreProvider storeProvider) {
         return storeProvider.getJobUpdateStore().pruneHistory(retainCount, pruningThresholdMs);
       }
     });
@@ -979,29 +992,28 @@ public class DBJobUpdateStoreTest {
         .setInstanceEvents(IJobInstanceUpdateEvent.toBuildersList(instanceEvents)));
   }
 
-  private IJobUpdateSummary makeSummary(IJobKey jobKey, String updateId, String user) {
+  private IJobUpdateSummary makeSummary(IJobUpdateKey key, String user) {
     return IJobUpdateSummary.build(new JobUpdateSummary()
-        .setUpdateId(updateId)
-        .setJobKey(jobKey.newBuilder())
+        .setUpdateId(key.getId())
+        .setJobKey(key.getJob().newBuilder())
         .setUser(user));
   }
 
   private IJobUpdateSummary saveSummary(
-      IJobKey jobKey,
-      String updateId,
+      IJobUpdateKey key,
       Long modifiedTimestampMs,
       JobUpdateStatus status,
       String user,
       Optional<String> lockToken) {
 
     IJobUpdateSummary summary = IJobUpdateSummary.build(new JobUpdateSummary()
-        .setUpdateId(updateId)
-        .setJobKey(jobKey.newBuilder())
+        .setUpdateId(key.getId())
+        .setJobKey(key.getJob().newBuilder())
         .setUser(user));
 
     IJobUpdate update = makeJobUpdate(summary);
     saveUpdate(update, lockToken);
-    saveJobEvent(makeJobUpdateEvent(status, modifiedTimestampMs), updateId);
+    saveJobEvent(makeJobUpdateEvent(status, modifiedTimestampMs), key);
     return populateExpected(update, status, CREATED_MS, modifiedTimestampMs).getSummary();
   }
 
@@ -1009,9 +1021,9 @@ public class DBJobUpdateStoreTest {
     return IJobUpdate.build(makeJobUpdate().newBuilder().setSummary(summary.newBuilder()));
   }
 
-  private IJobUpdate makeJobUpdate(IJobKey jobKey, String updateId) {
+  private IJobUpdate makeJobUpdate(IJobUpdateKey key) {
     return IJobUpdate.build(makeJobUpdate().newBuilder()
-        .setSummary(makeSummary(jobKey, updateId, "user").newBuilder()));
+        .setSummary(makeSummary(key, "user").newBuilder()));
   }
 
   private IJobUpdate makeJobUpdate() {


Mime
View raw message