aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [2/2] aurora git commit: Add a task store implementation that uses a relational database.
Date Tue, 12 May 2015 23:51:29 GMT
Add a task store implementation that uses a relational database.

Bugs closed: AURORA-556

Reviewed at https://reviews.apache.org/r/33612/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/bf7f9b7f
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/bf7f9b7f
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/bf7f9b7f

Branch: refs/heads/master
Commit: bf7f9b7f93637f78ec9f029b70a7c4bdac2f206a
Parents: be75c36
Author: Bill Farner <wfarner@apache.org>
Authored: Tue May 12 16:51:07 2015 -0700
Committer: Bill Farner <wfarner@apache.org>
Committed: Tue May 12 16:51:07 2015 -0700

----------------------------------------------------------------------
 .../thrift/org/apache/aurora/gen/api.thrift     |   7 +-
 examples/vagrant/upstart/aurora-scheduler.conf  |   1 +
 .../aurora/benchmark/ThriftApiBenchmarks.java   |   2 +-
 .../aurora/scheduler/app/SchedulerMain.java     |   2 +-
 .../scheduler/storage/db/DBJobUpdateStore.java  | 276 --------------
 .../scheduler/storage/db/DbJobUpdateStore.java  | 276 ++++++++++++++
 .../aurora/scheduler/storage/db/DbModule.java   | 120 ++++--
 .../aurora/scheduler/storage/db/DbStorage.java  |  10 +
 .../scheduler/storage/db/DbTaskStore.java       | 361 +++++++++++++++++++
 .../aurora/scheduler/storage/db/DbUtil.java     |   3 +
 .../scheduler/storage/db/TaskConfigManager.java | 138 +++++++
 .../scheduler/storage/db/TaskConfigMapper.java  | 167 +++++++++
 .../aurora/scheduler/storage/db/TaskMapper.java |  85 +++++
 .../storage/db/shims/ContainerShim.java         |  42 +++
 .../storage/db/shims/TaskConstraintShim.java    |  42 +++
 .../typehandlers/ScheduleStatusTypeHandler.java |  26 ++
 .../storage/db/typehandlers/TypeHandlers.java   |   1 +
 .../storage/db/views/AssignedPort.java          |  40 ++
 .../storage/db/views/ScheduledTaskWrapper.java  |  48 +++
 .../storage/db/views/TaskConfigRow.java         |  60 +++
 .../scheduler/storage/db/views/TaskLink.java    |  40 ++
 .../storage/mem/InMemStoresModule.java          |  27 +-
 .../storage/db/JobUpdateDetailsMapper.xml       |  12 +-
 .../scheduler/storage/db/TaskConfigMapper.xml   | 323 +++++++++++++++++
 .../aurora/scheduler/storage/db/TaskMapper.xml  | 228 ++++++++++++
 .../scheduler/state/StateManagerImplTest.java   |   1 +
 .../storage/AbstractTaskStoreTest.java          |  19 +-
 .../storage/db/DbJobUpdateStoreTest.java        |   2 +-
 .../scheduler/storage/db/DbTaskStoreTest.java   |  66 ++++
 .../storage/mem/InMemTaskStoreTest.java         |   3 +-
 .../storage/mem/StorageTransactionTest.java     |  42 +--
 31 files changed, 2125 insertions(+), 345 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/api/src/main/thrift/org/apache/aurora/gen/api.thrift
----------------------------------------------------------------------
diff --git a/api/src/main/thrift/org/apache/aurora/gen/api.thrift b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
index 0182ecb..dd54e5b 100644
--- a/api/src/main/thrift/org/apache/aurora/gen/api.thrift
+++ b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
@@ -245,8 +245,6 @@ struct TaskConfig {
  20: set<Constraint> constraints
  /** a list of named ports this task requests */
  21: set<string> requestedPorts
- /** the container the task should use to execute */
- 29: optional Container container = { "mesos": {} }
  /**
   * Custom links to include when displaying this task on the scheduler dashboard. Keys are anchor
   * text, values are URLs. Wildcards are supported for dynamic link crafting based on host, ports,
@@ -258,6 +256,11 @@ struct TaskConfig {
  25: optional ExecutorConfig executorConfig
  /** Used to display additional details in the UI. */
  27: optional set<Metadata> metadata
+
+ // This field is deliberately placed at the end to work around a bug in the immutable wrapper
+ // code generator.  See AURORA-1185 for details.
+ /** the container the task should use to execute */
+ 29: optional Container container = { "mesos": {} }
 }
 
 /** Defines the policy for launching a new cron job when one is already running. */

http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/examples/vagrant/upstart/aurora-scheduler.conf
----------------------------------------------------------------------
diff --git a/examples/vagrant/upstart/aurora-scheduler.conf b/examples/vagrant/upstart/aurora-scheduler.conf
index cc4864c..f4b867c 100644
--- a/examples/vagrant/upstart/aurora-scheduler.conf
+++ b/examples/vagrant/upstart/aurora-scheduler.conf
@@ -42,4 +42,5 @@ exec bin/aurora-scheduler \
   -logtostderr \
   -allowed_container_types=MESOS,DOCKER \
   -http_authentication_mechanism=BASIC \
+  -use_beta_db_task_store=true \
   -shiro_ini_path=etc/shiro.example.ini

http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java
index 88d27dd..c3f8b25 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java
@@ -151,7 +151,7 @@ public class ThriftApiBenchmarks {
             bind(StatsProvider.class).toInstance(new FakeStatsProvider());
           }
         },
-        new DbModule(Bindings.KeyFactory.PLAIN),
+        DbModule.productionModule(Bindings.KeyFactory.PLAIN),
         new ThriftModule.ReadOnly());
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
index 239c616..c31446c 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
@@ -155,7 +155,7 @@ public class SchedulerMain extends AbstractApplication {
         .addAll(getExtraModules())
         .add(getPersistentStorageModule())
         .add(new CronModule())
-        .add(new DbModule(Bindings.annotatedKeyFactory(Storage.Volatile.class)))
+        .add(DbModule.productionModule(Bindings.annotatedKeyFactory(Storage.Volatile.class)))
         .build();
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java
deleted file mode 100644
index ea56007..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.db;
-
-import java.util.List;
-import java.util.Set;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-import com.twitter.common.base.MorePreconditions;
-
-import org.apache.aurora.gen.JobUpdate;
-import org.apache.aurora.gen.JobUpdateInstructions;
-import org.apache.aurora.gen.JobUpdateKey;
-import org.apache.aurora.gen.JobUpdateStatus;
-import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
-import org.apache.aurora.scheduler.stats.CachedCounters;
-import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig;
-import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
-import org.apache.aurora.scheduler.storage.entities.IRange;
-
-import static java.util.Objects.requireNonNull;
-
-import static com.twitter.common.inject.TimedInterceptor.Timed;
-
-/**
- * A relational database-backed job update store.
- */
-public class DBJobUpdateStore implements JobUpdateStore.Mutable {
-
-  private final JobKeyMapper jobKeyMapper;
-  private final JobUpdateDetailsMapper detailsMapper;
-  private final JobUpdateEventMapper jobEventMapper;
-  private final JobInstanceUpdateEventMapper instanceEventMapper;
-  private final CachedCounters stats;
-
-  @Inject
-  DBJobUpdateStore(
-      JobKeyMapper jobKeyMapper,
-      JobUpdateDetailsMapper detailsMapper,
-      JobUpdateEventMapper jobEventMapper,
-      JobInstanceUpdateEventMapper instanceEventMapper,
-      CachedCounters stats) {
-
-    this.jobKeyMapper = requireNonNull(jobKeyMapper);
-    this.detailsMapper = requireNonNull(detailsMapper);
-    this.jobEventMapper = requireNonNull(jobEventMapper);
-    this.instanceEventMapper = requireNonNull(instanceEventMapper);
-    this.stats = requireNonNull(stats);
-  }
-
-  @Timed("job_update_store_save_update")
-  @Override
-  public void saveJobUpdate(IJobUpdate update, Optional<String> lockToken) {
-    requireNonNull(update);
-    if (!update.getInstructions().isSetDesiredState()
-        && update.getInstructions().getInitialState().isEmpty()) {
-      throw new IllegalArgumentException(
-          "Missing both initial and desired states. At least one is required.");
-    }
-
-    IJobUpdateSummary summary = update.getSummary();
-    jobKeyMapper.merge(summary.getJobKey().newBuilder());
-    detailsMapper.insert(update.newBuilder());
-
-    IJobUpdateKey key = IJobUpdateKey.build(
-        new JobUpdateKey(summary.getJobKey().newBuilder(), summary.getUpdateId()));
-    if (lockToken.isPresent()) {
-      detailsMapper.insertLockToken(key, lockToken.get());
-    }
-
-    // Insert optional instance update overrides.
-    Set<IRange> instanceOverrides =
-        update.getInstructions().getSettings().getUpdateOnlyTheseInstances();
-
-    if (!instanceOverrides.isEmpty()) {
-      detailsMapper.insertInstanceOverrides(key, IRange.toBuildersSet(instanceOverrides));
-    }
-
-    // Insert desired state task config and instance mappings.
-    if (update.getInstructions().isSetDesiredState()) {
-      IInstanceTaskConfig desired = update.getInstructions().getDesiredState();
-      detailsMapper.insertTaskConfig(
-          key,
-          desired.getTask().newBuilder(),
-          true,
-          new InsertResult());
-
-      detailsMapper.insertDesiredInstances(
-          key,
-          IRange.toBuildersSet(MorePreconditions.checkNotBlank(desired.getInstances())));
-    }
-
-    // Insert initial state task configs and instance mappings.
-    if (!update.getInstructions().getInitialState().isEmpty()) {
-      for (IInstanceTaskConfig config : update.getInstructions().getInitialState()) {
-        InsertResult result = new InsertResult();
-        detailsMapper.insertTaskConfig(key, config.getTask().newBuilder(), false, result);
-
-        detailsMapper.insertTaskConfigInstances(
-            result.getId(),
-            IRange.toBuildersSet(MorePreconditions.checkNotBlank(config.getInstances())));
-      }
-    }
-  }
-
-  @VisibleForTesting
-  static String statName(JobUpdateStatus status) {
-    return "update_transition_" + status;
-  }
-
-  @Timed("job_update_store_save_event")
-  @Override
-  public void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event) {
-    stats.get(statName(event.getStatus())).incrementAndGet();
-    jobEventMapper.insert(key, event.newBuilder());
-  }
-
-  @Timed("job_update_store_save_instance_event")
-  @Override
-  public void saveJobInstanceUpdateEvent(IJobUpdateKey key, IJobInstanceUpdateEvent event) {
-    instanceEventMapper.insert(key, event.newBuilder());
-  }
-
-  @Timed("job_update_store_delete_all")
-  @Override
-  public void deleteAllUpdatesAndEvents() {
-    detailsMapper.truncate();
-  }
-
-  private static final Function<PruneVictim, Long> GET_ROW_ID = new Function<PruneVictim, Long>() {
-    @Override
-    public Long apply(PruneVictim victim) {
-      return victim.getRowId();
-    }
-  };
-
-  private static final Function<PruneVictim, IJobUpdateKey> GET_UPDATE_KEY =
-      new Function<PruneVictim, IJobUpdateKey>() {
-        @Override
-        public IJobUpdateKey apply(PruneVictim victim) {
-          return IJobUpdateKey.build(victim.getUpdate());
-        }
-      };
-
-  @Timed("job_update_store_prune_history")
-  @Override
-  public Set<IJobUpdateKey> pruneHistory(int perJobRetainCount, long historyPruneThresholdMs) {
-    ImmutableSet.Builder<IJobUpdateKey> pruned = ImmutableSet.builder();
-
-    Set<Long> jobKeyIdsToPrune = detailsMapper.selectJobKeysForPruning(
-        perJobRetainCount,
-        historyPruneThresholdMs);
-
-    for (Long jobKeyId : jobKeyIdsToPrune) {
-      Set<PruneVictim> pruneVictims = detailsMapper.selectPruneVictims(
-          jobKeyId,
-          perJobRetainCount,
-          historyPruneThresholdMs);
-
-      detailsMapper.deleteCompletedUpdates(
-          FluentIterable.from(pruneVictims).transform(GET_ROW_ID).toSet());
-      pruned.addAll(FluentIterable.from(pruneVictims).transform(GET_UPDATE_KEY));
-    }
-
-    return pruned.build();
-  }
-
-  @Timed("job_update_store_fetch_summaries")
-  @Override
-  public List<IJobUpdateSummary> fetchJobUpdateSummaries(IJobUpdateQuery query) {
-    return IJobUpdateSummary.listFromBuilders(detailsMapper.selectSummaries(query.newBuilder()));
-  }
-
-  @Timed("job_update_store_fetch_details_list")
-  @Override
-  public List<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateQuery query) {
-    return FluentIterable
-        .from(detailsMapper.selectDetailsList(query.newBuilder()))
-        .transform(new Function<StoredJobUpdateDetails, IJobUpdateDetails>() {
-          @Override
-          public IJobUpdateDetails apply(StoredJobUpdateDetails input) {
-            return IJobUpdateDetails.build(input.getDetails());
-          }
-        }).toList();
-  }
-
-  @Timed("job_update_store_fetch_details")
-  @Override
-  public Optional<IJobUpdateDetails> fetchJobUpdateDetails(final IJobUpdateKey key) {
-    return Optional.fromNullable(detailsMapper.selectDetails(key))
-        .transform(new Function<StoredJobUpdateDetails, IJobUpdateDetails>() {
-          @Override
-          public IJobUpdateDetails apply(StoredJobUpdateDetails input) {
-            return IJobUpdateDetails.build(input.getDetails());
-          }
-        });
-  }
-
-  @Timed("job_update_store_fetch_update")
-  @Override
-  public Optional<IJobUpdate> fetchJobUpdate(IJobUpdateKey key) {
-    return Optional.fromNullable(detailsMapper.selectUpdate(key))
-        .transform(new Function<JobUpdate, IJobUpdate>() {
-          @Override
-          public IJobUpdate apply(JobUpdate input) {
-            return IJobUpdate.build(input);
-          }
-        });
-  }
-
-  @Timed("job_update_store_fetch_instructions")
-  @Override
-  public Optional<IJobUpdateInstructions> fetchJobUpdateInstructions(IJobUpdateKey key) {
-    return Optional.fromNullable(detailsMapper.selectInstructions(key))
-        .transform(new Function<JobUpdateInstructions, IJobUpdateInstructions>() {
-          @Override
-          public IJobUpdateInstructions apply(JobUpdateInstructions input) {
-            return IJobUpdateInstructions.build(input);
-          }
-        });
-  }
-
-  @Timed("job_update_store_fetch_all_details")
-  @Override
-  public Set<StoredJobUpdateDetails> fetchAllJobUpdateDetails() {
-    return ImmutableSet.copyOf(detailsMapper.selectAllDetails());
-  }
-
-  @Timed("job_update_store_fetch_update_key")
-  @Override
-  public Optional<IJobUpdateKey> fetchUpdateKey(String updateId) {
-    return Optional.fromNullable(detailsMapper.selectUpdateKey(updateId))
-        .transform(IJobUpdateKey.FROM_BUILDER);
-  }
-
-  @Timed("job_update_store_get_lock_token")
-  @Override
-  public Optional<String> getLockToken(IJobUpdateKey key) {
-    // We assume here that cascading deletes will cause a lock-update associative row to disappear
-    // when the lock is invalidated.  This further assumes that a lock row is deleted when a lock
-    // is no longer valid.
-    return Optional.fromNullable(detailsMapper.selectLockToken(key));
-  }
-
-  @Timed("job_update_store_fetch_instance_events")
-  @Override
-  public List<IJobInstanceUpdateEvent> fetchInstanceEvents(IJobUpdateKey key, int instanceId) {
-    return IJobInstanceUpdateEvent.listFromBuilders(
-        detailsMapper.selectInstanceUpdateEvents(key, instanceId));
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java
new file mode 100644
index 0000000..4b9d7f5
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.db;
+
+import java.util.List;
+import java.util.Set;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
+import com.twitter.common.base.MorePreconditions;
+
+import org.apache.aurora.gen.JobUpdate;
+import org.apache.aurora.gen.JobUpdateInstructions;
+import org.apache.aurora.gen.JobUpdateKey;
+import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
+import org.apache.aurora.scheduler.stats.CachedCounters;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig;
+import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
+import org.apache.aurora.scheduler.storage.entities.IRange;
+
+import static java.util.Objects.requireNonNull;
+
+import static com.twitter.common.inject.TimedInterceptor.Timed;
+
+/**
+ * A relational database-backed job update store.
+ */
+public class DbJobUpdateStore implements JobUpdateStore.Mutable {
+
+  private final JobKeyMapper jobKeyMapper;
+  private final JobUpdateDetailsMapper detailsMapper;
+  private final JobUpdateEventMapper jobEventMapper;
+  private final JobInstanceUpdateEventMapper instanceEventMapper;
+  private final CachedCounters stats;
+
+  @Inject
+  DbJobUpdateStore(
+      JobKeyMapper jobKeyMapper,
+      JobUpdateDetailsMapper detailsMapper,
+      JobUpdateEventMapper jobEventMapper,
+      JobInstanceUpdateEventMapper instanceEventMapper,
+      CachedCounters stats) {
+
+    this.jobKeyMapper = requireNonNull(jobKeyMapper);
+    this.detailsMapper = requireNonNull(detailsMapper);
+    this.jobEventMapper = requireNonNull(jobEventMapper);
+    this.instanceEventMapper = requireNonNull(instanceEventMapper);
+    this.stats = requireNonNull(stats);
+  }
+
+  @Timed("job_update_store_save_update")
+  @Override
+  public void saveJobUpdate(IJobUpdate update, Optional<String> lockToken) {
+    requireNonNull(update);
+    if (!update.getInstructions().isSetDesiredState()
+        && update.getInstructions().getInitialState().isEmpty()) {
+      throw new IllegalArgumentException(
+          "Missing both initial and desired states. At least one is required.");
+    }
+
+    IJobUpdateSummary summary = update.getSummary();
+    jobKeyMapper.merge(summary.getJobKey().newBuilder());
+    detailsMapper.insert(update.newBuilder());
+
+    IJobUpdateKey key = IJobUpdateKey.build(
+        new JobUpdateKey(summary.getJobKey().newBuilder(), summary.getUpdateId()));
+    if (lockToken.isPresent()) {
+      detailsMapper.insertLockToken(key, lockToken.get());
+    }
+
+    // Insert optional instance update overrides.
+    Set<IRange> instanceOverrides =
+        update.getInstructions().getSettings().getUpdateOnlyTheseInstances();
+
+    if (!instanceOverrides.isEmpty()) {
+      detailsMapper.insertInstanceOverrides(key, IRange.toBuildersSet(instanceOverrides));
+    }
+
+    // Insert desired state task config and instance mappings.
+    if (update.getInstructions().isSetDesiredState()) {
+      IInstanceTaskConfig desired = update.getInstructions().getDesiredState();
+      detailsMapper.insertTaskConfig(
+          key,
+          desired.getTask().newBuilder(),
+          true,
+          new InsertResult());
+
+      detailsMapper.insertDesiredInstances(
+          key,
+          IRange.toBuildersSet(MorePreconditions.checkNotBlank(desired.getInstances())));
+    }
+
+    // Insert initial state task configs and instance mappings.
+    if (!update.getInstructions().getInitialState().isEmpty()) {
+      for (IInstanceTaskConfig config : update.getInstructions().getInitialState()) {
+        InsertResult result = new InsertResult();
+        detailsMapper.insertTaskConfig(key, config.getTask().newBuilder(), false, result);
+
+        detailsMapper.insertTaskConfigInstances(
+            result.getId(),
+            IRange.toBuildersSet(MorePreconditions.checkNotBlank(config.getInstances())));
+      }
+    }
+  }
+
+  @VisibleForTesting
+  static String statName(JobUpdateStatus status) {
+    return "update_transition_" + status;
+  }
+
+  @Timed("job_update_store_save_event")
+  @Override
+  public void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event) {
+    stats.get(statName(event.getStatus())).incrementAndGet();
+    jobEventMapper.insert(key, event.newBuilder());
+  }
+
+  @Timed("job_update_store_save_instance_event")
+  @Override
+  public void saveJobInstanceUpdateEvent(IJobUpdateKey key, IJobInstanceUpdateEvent event) {
+    instanceEventMapper.insert(key, event.newBuilder());
+  }
+
+  @Timed("job_update_store_delete_all")
+  @Override
+  public void deleteAllUpdatesAndEvents() {
+    detailsMapper.truncate();
+  }
+
+  private static final Function<PruneVictim, Long> GET_ROW_ID = new Function<PruneVictim, Long>() {
+    @Override
+    public Long apply(PruneVictim victim) {
+      return victim.getRowId();
+    }
+  };
+
+  private static final Function<PruneVictim, IJobUpdateKey> GET_UPDATE_KEY =
+      new Function<PruneVictim, IJobUpdateKey>() {
+        @Override
+        public IJobUpdateKey apply(PruneVictim victim) {
+          return IJobUpdateKey.build(victim.getUpdate());
+        }
+      };
+
+  @Timed("job_update_store_prune_history")
+  @Override
+  public Set<IJobUpdateKey> pruneHistory(int perJobRetainCount, long historyPruneThresholdMs) {
+    ImmutableSet.Builder<IJobUpdateKey> pruned = ImmutableSet.builder();
+
+    Set<Long> jobKeyIdsToPrune = detailsMapper.selectJobKeysForPruning(
+        perJobRetainCount,
+        historyPruneThresholdMs);
+
+    for (long jobKeyId : jobKeyIdsToPrune) {
+      Set<PruneVictim> pruneVictims = detailsMapper.selectPruneVictims(
+          jobKeyId,
+          perJobRetainCount,
+          historyPruneThresholdMs);
+
+      detailsMapper.deleteCompletedUpdates(
+          FluentIterable.from(pruneVictims).transform(GET_ROW_ID).toSet());
+      pruned.addAll(FluentIterable.from(pruneVictims).transform(GET_UPDATE_KEY));
+    }
+
+    return pruned.build();
+  }
+
+  @Timed("job_update_store_fetch_summaries")
+  @Override
+  public List<IJobUpdateSummary> fetchJobUpdateSummaries(IJobUpdateQuery query) {
+    return IJobUpdateSummary.listFromBuilders(detailsMapper.selectSummaries(query.newBuilder()));
+  }
+
+  @Timed("job_update_store_fetch_details_list")
+  @Override
+  public List<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateQuery query) {
+    return FluentIterable
+        .from(detailsMapper.selectDetailsList(query.newBuilder()))
+        .transform(new Function<StoredJobUpdateDetails, IJobUpdateDetails>() {
+          @Override
+          public IJobUpdateDetails apply(StoredJobUpdateDetails input) {
+            return IJobUpdateDetails.build(input.getDetails());
+          }
+        }).toList();
+  }
+
+  @Timed("job_update_store_fetch_details")
+  @Override
+  public Optional<IJobUpdateDetails> fetchJobUpdateDetails(final IJobUpdateKey key) {
+    return Optional.fromNullable(detailsMapper.selectDetails(key))
+        .transform(new Function<StoredJobUpdateDetails, IJobUpdateDetails>() {
+          @Override
+          public IJobUpdateDetails apply(StoredJobUpdateDetails input) {
+            return IJobUpdateDetails.build(input.getDetails());
+          }
+        });
+  }
+
+  @Timed("job_update_store_fetch_update")
+  @Override
+  public Optional<IJobUpdate> fetchJobUpdate(IJobUpdateKey key) {
+    return Optional.fromNullable(detailsMapper.selectUpdate(key))
+        .transform(new Function<JobUpdate, IJobUpdate>() {
+          @Override
+          public IJobUpdate apply(JobUpdate input) {
+            return IJobUpdate.build(input);
+          }
+        });
+  }
+
+  @Timed("job_update_store_fetch_instructions")
+  @Override
+  public Optional<IJobUpdateInstructions> fetchJobUpdateInstructions(IJobUpdateKey key) {
+    return Optional.fromNullable(detailsMapper.selectInstructions(key))
+        .transform(new Function<JobUpdateInstructions, IJobUpdateInstructions>() {
+          @Override
+          public IJobUpdateInstructions apply(JobUpdateInstructions input) {
+            return IJobUpdateInstructions.build(input);
+          }
+        });
+  }
+
+  @Timed("job_update_store_fetch_all_details")
+  @Override
+  public Set<StoredJobUpdateDetails> fetchAllJobUpdateDetails() {
+    return ImmutableSet.copyOf(detailsMapper.selectAllDetails());
+  }
+
+  @Timed("job_update_store_fetch_update_key")
+  @Override
+  public Optional<IJobUpdateKey> fetchUpdateKey(String updateId) {
+    return Optional.fromNullable(detailsMapper.selectUpdateKey(updateId))
+        .transform(IJobUpdateKey.FROM_BUILDER);
+  }
+
+  @Timed("job_update_store_get_lock_token")
+  @Override
+  public Optional<String> getLockToken(IJobUpdateKey key) {
+    // We assume here that cascading deletes will cause a lock-update associative row to disappear
+    // when the lock is invalidated.  This further assumes that a lock row is deleted when a lock
+    // is no longer valid.
+    return Optional.fromNullable(detailsMapper.selectLockToken(key));
+  }
+
+  @Timed("job_update_store_fetch_instance_events")
+  @Override
+  public List<IJobInstanceUpdateEvent> fetchInstanceEvents(IJobUpdateKey key, int instanceId) {
+    return IJobInstanceUpdateEvent.listFromBuilders(
+        detailsMapper.selectInstanceUpdateEvents(key, instanceId));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
index 8859ca4..e351588 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
@@ -14,6 +14,7 @@
 package org.apache.aurora.scheduler.storage.db;
 
 import java.util.Set;
+import java.util.UUID;
 
 import javax.inject.Singleton;
 
@@ -21,8 +22,14 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.inject.Key;
+import com.google.inject.Module;
 import com.google.inject.PrivateModule;
+import com.google.inject.TypeLiteral;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
 import com.twitter.common.inject.Bindings.KeyFactory;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
 
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.CronJobStore;
@@ -47,22 +54,17 @@ import static com.google.inject.name.Names.bindProperties;
 
 /**
  * Binding module for a relational database storage system.
- * <p>
- *   Currently only exposes bindings for:
- *   <ul>
- *     <li>{@link org.apache.aurora.scheduler.storage.db.DbStorage}</li>
- *     <li>{@link org.apache.ibatis.session.SqlSessionFactory}</li>
- *     <li>Keys provided by the provided{@code keyFactory} for:
- *        <ul>
- *          <li>{@link LockStore.Mutable}</li>
- *          <li>{@link QuotaStore.Mutable}</li>
- *          <li>{@link SchedulerStore.Mutable}</li>
- *        </ul>
- *     </li>
- *   </ul>
- * </p>
  */
-public class DbModule extends PrivateModule {
+public final class DbModule extends PrivateModule {
+
+  @CmdLine(name = "use_beta_db_task_store",
+      help = "Whether to use the experimental database-backed task store.")
+  private static final Arg<Boolean> USE_DB_TASK_STORE = Arg.create(false);
+
+  @CmdLine(name = "slow_query_log_threshold",
+      help = "Log all queries that take at least this long to execute.")
+  private static final Arg<Amount<Long, Time>> SLOW_QUERY_LOG_THRESHOLD =
+      Arg.create(Amount.of(25L, Time.MILLISECONDS));
 
   private static final Set<Class<?>> MAPPER_CLASSES = ImmutableSet.<Class<?>>builder()
       .add(AttributeMapper.class)
@@ -74,19 +76,54 @@ public class DbModule extends PrivateModule {
       .add(JobUpdateDetailsMapper.class)
       .add(LockMapper.class)
       .add(QuotaMapper.class)
+      .add(TaskConfigMapper.class)
+      .add(TaskMapper.class)
       .build();
 
   private final KeyFactory keyFactory;
+  private final Module taskStoreModule;
   private final String jdbcSchema;
 
-  private DbModule(KeyFactory keyFactory, String jdbcSchema) {
+  private DbModule(KeyFactory keyFactory, Module taskStoreModule, String jdbcSchema) {
     this.keyFactory = requireNonNull(keyFactory);
+    this.taskStoreModule = requireNonNull(taskStoreModule);
     // We always disable the MvStore, as it is in beta as of this writing.
     this.jdbcSchema = jdbcSchema + ";MV_STORE=false";
   }
 
-  public DbModule(KeyFactory keyFactory) {
-    this(keyFactory, "aurora;DB_CLOSE_DELAY=-1");
+  /**
+   * Creates a module that will prepare a volatile storage system suitable for use in a production
+   * environment.
+   *
+   * @param keyFactory Binding scope for the storage system.
+   * @return A new database module for production.
+   */
+  public static Module productionModule(KeyFactory keyFactory) {
+    return new DbModule(
+        keyFactory,
+        USE_DB_TASK_STORE.get()
+            ? new TaskStoreModule(keyFactory)
+            : new InMemStoresModule.TaskStoreModule(keyFactory),
+        "aurora;DB_CLOSE_DELAY=-1");
+  }
+
+  /**
+   * Creates a module that will prepare a private in-memory database, using a specific task store
+   * implementation bound within the provided module.
+   *
+   * @param taskStoreModule Module providing task store bindings.
+   * @return A new database module for testing.
+   */
+  @VisibleForTesting
+  public static Module testModule(Module taskStoreModule) {
+    return new DbModule(
+        KeyFactory.PLAIN,
+        taskStoreModule,
+        // A non-zero close delay is used here to avoid eager database cleanup in tests that
+        // make use of multiple threads.  Since all test databases are separately scoped by the
+        // included UUID, multiple DB instances will overlap in time but they should be distinct
+        // in content.
+        "testdb-" + UUID.randomUUID().toString() + ";DB_CLOSE_DELAY=5");
   }
 
   /**
@@ -95,10 +132,8 @@ public class DbModule extends PrivateModule {
    * @return A new database module for testing.
    */
   @VisibleForTesting
-  public static DbModule testModule() {
-    // This creates a private in-memory database.  New connections will have a _new_ database,
-    // and closing the database will expunge its data.
-    return new DbModule(KeyFactory.PLAIN, "");
+  public static Module testModule() {
+    return testModule(new DbModule.TaskStoreModule(KeyFactory.PLAIN));
   }
 
   private <T> void bindStore(Class<T> binding, Class<? extends T> impl) {
@@ -131,9 +166,21 @@ public class DbModule extends PrivateModule {
         autoMappingBehavior(AutoMappingBehavior.FULL);
 
         addTypeHandlersClasses(TypeHandlers.getAll());
+
+        bind(new TypeLiteral<Amount<Long, Time>>() { }).toInstance(SLOW_QUERY_LOG_THRESHOLD.get());
+
+        // Exposed for unit tests.
+        bind(TaskConfigManager.class);
+        expose(TaskConfigManager.class);
+
+        // TODO(wfarner): Don't expose these bindings once the task store is directly bound here.
+        expose(TaskMapper.class);
+        expose(TaskConfigManager.class);
+        expose(JobKeyMapper.class);
       }
     });
     install(new InMemStoresModule(keyFactory));
+    install(taskStoreModule);
     expose(keyFactory.create(CronJobStore.Mutable.class));
     expose(keyFactory.create(TaskStore.Mutable.class));
 
@@ -141,7 +188,7 @@ public class DbModule extends PrivateModule {
     bindStore(LockStore.Mutable.class, DbLockStore.class);
     bindStore(QuotaStore.Mutable.class, DbQuotaStore.class);
     bindStore(SchedulerStore.Mutable.class, DbSchedulerStore.class);
-    bindStore(JobUpdateStore.Mutable.class, DBJobUpdateStore.class);
+    bindStore(JobUpdateStore.Mutable.class, DbJobUpdateStore.class);
 
     Key<Storage> storageKey = keyFactory.create(Storage.class);
     bind(storageKey).to(DbStorage.class);
@@ -151,4 +198,31 @@ public class DbModule extends PrivateModule {
     expose(DbStorage.class);
     expose(SqlSessionFactory.class);
   }
+
+  /**
+   * A module that binds a database task store.
+   * <p/>
+   * TODO(wfarner): Inline these bindings once there is only one task store implementation.
+   */
+  public static class TaskStoreModule extends PrivateModule {
+    private final KeyFactory keyFactory;
+
+    public TaskStoreModule(KeyFactory keyFactory) {
+      this.keyFactory = requireNonNull(keyFactory);
+    }
+
+    private <T> void bindStore(Class<T> binding, Class<? extends T> impl) {
+      bind(binding).to(impl);
+      bind(impl).in(Singleton.class);
+      Key<T> key = keyFactory.create(binding);
+      bind(key).to(impl);
+      expose(key);
+    }
+
+    @Override
+    protected void configure() {
+      bindStore(TaskStore.Mutable.class, DbTaskStore.class);
+      expose(TaskStore.Mutable.class);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
index 1a6c3f2..bb61542 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
@@ -26,6 +26,7 @@ import com.twitter.common.inject.TimedInterceptor.Timed;
 import org.apache.aurora.gen.JobUpdateAction;
 import org.apache.aurora.gen.JobUpdateStatus;
 import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.CronJobStore;
 import org.apache.aurora.scheduler.storage.JobUpdateStore;
@@ -38,6 +39,7 @@ import org.apache.ibatis.builder.StaticSqlSource;
 import org.apache.ibatis.exceptions.PersistenceException;
 import org.apache.ibatis.mapping.MappedStatement.Builder;
 import org.apache.ibatis.session.Configuration;
+import org.apache.ibatis.session.ExecutorType;
 import org.apache.ibatis.session.SqlSession;
 import org.apache.ibatis.session.SqlSessionFactory;
 import org.mybatis.guice.transactional.Transactional;
@@ -190,6 +192,10 @@ class DbStorage extends AbstractIdleService implements Storage {
     String createStatementName = "create_tables";
     configuration.setMapUnderscoreToCamelCase(true);
 
+    // The ReuseExecutor will cache jdbc Statements with equivalent SQL, improving performance
+    // slightly when redundant queries are made.
+    configuration.setDefaultExecutorType(ExecutorType.REUSE);
+
     addMappedStatement(
         configuration,
         createStatementName,
@@ -214,6 +220,10 @@ class DbStorage extends AbstractIdleService implements Storage {
     for (JobUpdateAction action : JobUpdateAction.values()) {
       enumValueMapper.addEnumValue("job_instance_update_actions", action.getValue(), action.name());
     }
+
+    for (ScheduleStatus status : ScheduleStatus.values()) {
+      enumValueMapper.addEnumValue("task_states", status.getValue(), status.name());
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java
new file mode 100644
index 0000000..76f65da
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java
@@ -0,0 +1,361 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.db;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.inject.Inject;
+import com.twitter.common.inject.TimedInterceptor.Timed;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.gen.Constraint;
+import org.apache.aurora.gen.Container;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.TaskConstraint;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Query.Builder;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.db.views.AssignedPort;
+import org.apache.aurora.scheduler.storage.db.views.ScheduledTaskWrapper;
+import org.apache.aurora.scheduler.storage.db.views.TaskConfigRow;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+import static java.util.Objects.requireNonNull;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A task store implementation based on a relational database.
+ * <p>
+ * TODO(wfarner): Consider modifying code generator to support directly producing ITaskConfig, etc
+ * from myBatis (it will set private final fields just fine).  This would reduce memory and time
+ * spent translating and copying objects.
+ */
+class DbTaskStore implements TaskStore.Mutable {
+
+  private static final Logger LOG = Logger.getLogger(DbTaskStore.class.getName());
+
+  private final TaskMapper taskMapper;
+  private final TaskConfigManager configManager;
+  private final JobKeyMapper jobKeyMapper;
+  private final Clock clock;
+  private final long slowQueryThresholdNanos;
+
+  @Inject
+  DbTaskStore(
+      TaskMapper taskMapper,
+      TaskConfigManager configManager,
+      JobKeyMapper jobKeyMapper,
+      Clock clock,
+      Amount<Long, Time> slowQueryThreshold) {
+
+    LOG.warning("DbTaskStore is experimental, and should not be used in production clusters!");
+    this.taskMapper = requireNonNull(taskMapper);
+    this.configManager = requireNonNull(configManager);
+    this.jobKeyMapper = requireNonNull(jobKeyMapper);
+    this.clock = requireNonNull(clock);
+    this.slowQueryThresholdNanos =  slowQueryThreshold.as(Time.NANOSECONDS);
+  }
+
+  @Timed("db_storage_fetch_tasks")
+  @Override
+  public ImmutableSet<IScheduledTask> fetchTasks(Builder query) {
+    requireNonNull(query);
+
+    // TODO(wfarner): Consider making slow query logging more reusable, or pushing it down into the
+    //                database.
+    long start = clock.nowNanos();
+    ImmutableSet<IScheduledTask> result = matches(query).toSet();
+    long durationNanos = clock.nowNanos() - start;
+    Level level = durationNanos >= slowQueryThresholdNanos ? Level.INFO : Level.FINE;
+    if (LOG.isLoggable(level)) {
+      Long time = Amount.of(durationNanos, Time.NANOSECONDS).as(Time.MILLISECONDS);
+      LOG.log(level, "Query took " + time + " ms: " + query.get());
+    }
+
+    return result;
+  }
+
+  private static final Function<TaskConfigRow, Long> CONFIG_ID =
+      new Function<TaskConfigRow, Long>() {
+        @Override
+        public Long apply(TaskConfigRow row) {
+          return row.getId();
+        }
+      };
+
+  /**
+   * Computes an association between config table row ID and {@link ITaskConfig} object for all
+   * configs in the provided jobs.
+   *
+   * @param jobs Jobs to fetch task configs for.
+   * @return A mutable bi-map between row ID and task config.
+   */
+  private Map<ITaskConfig, Long> getTaskConfigRows(Set<IJobKey> jobs) {
+    Function<IJobKey, Iterable<TaskConfigRow>> getRows =
+        new Function<IJobKey, Iterable<TaskConfigRow>>() {
+          @Override
+          public Iterable<TaskConfigRow> apply(IJobKey job) {
+            return configManager.getConfigs(job);
+          }
+        };
+
+    Map<ITaskConfig, TaskConfigRow> rowsToIds =
+        FluentIterable.from(jobs)
+            .transformAndConcat(getRows)
+            .uniqueIndex(Functions.compose(ITaskConfig.FROM_BUILDER, getConfigSaturator()));
+
+    return Maps.transformValues(rowsToIds, CONFIG_ID);
+  }
+
+  @Timed("db_storage_save_tasks")
+  @Override
+  public void saveTasks(Set<IScheduledTask> tasks) {
+    if (tasks.isEmpty()) {
+      return;
+    }
+
+    // TODO(wfarner): Restrict the TaskStore.Mutable methods to more specific mutations.  It would
+    //                simplify this code if we did not have to handle full object tree mutations.
+
+    deleteTasks(Tasks.ids(tasks));
+
+    // Maintain a cache of all task configs that exist for a job key so that identical entities
+    LoadingCache<ITaskConfig, Long> configCache = CacheBuilder.newBuilder()
+        .build(new CacheLoader<ITaskConfig, Long>() {
+          @Override
+          public Long load(ITaskConfig config) {
+            return configManager.insert(config);
+          }
+        });
+
+    // Seed the cache with known configs in the jobs being updated.
+    configCache.putAll(getTaskConfigRows(
+        FluentIterable.from(tasks)
+            .transform(Tasks.SCHEDULED_TO_JOB_KEY)
+            .toSet()));
+
+    for (IScheduledTask task : tasks) {
+      jobKeyMapper.merge(task.getAssignedTask().getTask().getJob().newBuilder());
+      long configId = configCache.getUnchecked(task.getAssignedTask().getTask());
+
+      ScheduledTaskWrapper wrappedTask = new ScheduledTaskWrapper(-1, configId, task.newBuilder());
+      taskMapper.insertScheduledTask(wrappedTask);
+      Preconditions.checkState(
+          wrappedTask.getTaskRowId() != -1,
+          "Row ID should have been populated during insert.");
+      if (!task.getTaskEvents().isEmpty()) {
+        taskMapper.insertTaskEvents(wrappedTask.getTaskRowId(), task.getTaskEvents());
+      }
+      if (!task.getAssignedTask().getAssignedPorts().isEmpty()) {
+        taskMapper.insertPorts(
+            wrappedTask.getTaskRowId(),
+            toAssignedPorts(task.getAssignedTask().getAssignedPorts()));
+      }
+    }
+  }
+
+  private static List<AssignedPort> toAssignedPorts(Map<String, Integer> ports) {
+    // Mybatis does not seem to support inserting maps where the keys are not known in advance (it
+    // treats them as bags of properties, presumably like a cheap bean object).
+    // See https://github.com/mybatis/mybatis-3/pull/208, and seemingly-relevant code in
+    // https://github.com/mybatis/mybatis-3/blob/4cfc129938fd6b5cb20c4b741392e8b3fa41b529/src
+    // main/java/org/apache/ibatis/scripting/xmltags/ForEachSqlNode.java#L73-L77.
+    ImmutableList.Builder<AssignedPort> list = ImmutableList.builder();
+    for (Map.Entry<String, Integer> entry : ports.entrySet()) {
+      list.add(new AssignedPort(entry.getKey(), entry.getValue()));
+    }
+    return list.build();
+  }
+
+  @Timed("db_storage_delete_all_tasks")
+  @Override
+  public void deleteAllTasks() {
+    // TODO(wfarner): Need to re-evaluate all task configs after deleting tasks.
+    taskMapper.truncate();
+  }
+
+  @Timed("db_storage_delete_tasks")
+  @Override
+  public void deleteTasks(Set<String> taskIds) {
+    if (!taskIds.isEmpty()) {
+      // First fetch task configs referenced by these task IDs.
+      List<Long> configIds = configManager.getTaskConfigIds(taskIds);
+
+      taskMapper.deleteTasks(taskIds);
+
+      if (!configIds.isEmpty()) {
+        configManager.maybeExpungeConfigs(ImmutableSet.copyOf(configIds));
+      }
+    }
+  }
+
+  @Timed("db_storage_mutate_tasks")
+  @Override
+  public ImmutableSet<IScheduledTask> mutateTasks(
+      Builder query,
+      Function<IScheduledTask, IScheduledTask> mutator) {
+
+    requireNonNull(query);
+    requireNonNull(mutator);
+
+    ImmutableSet.Builder<IScheduledTask> mutated = ImmutableSet.builder();
+    for (IScheduledTask original : fetchTasks(query)) {
+      IScheduledTask maybeMutated = mutator.apply(original);
+      if (!original.equals(maybeMutated)) {
+        Preconditions.checkState(
+            Tasks.id(original).equals(Tasks.id(maybeMutated)),
+            "A task's ID may not be mutated.");
+        saveTasks(ImmutableSet.of(maybeMutated));
+        mutated.add(maybeMutated);
+      }
+    }
+
+    return mutated.build();
+  }
+
+  @Timed("db_storage_unsafe_modify_in_place")
+  @Override
+  public boolean unsafeModifyInPlace(String taskId, ITaskConfig taskConfiguration) {
+    checkNotNull(taskId);
+    checkNotNull(taskConfiguration);
+    Optional<IScheduledTask> task =
+        Optional.fromNullable(Iterables.getOnlyElement(fetchTasks(Query.taskScoped(taskId)), null));
+    if (task.isPresent()) {
+      deleteTasks(ImmutableSet.of(taskId));
+      ScheduledTask builder = task.get().newBuilder();
+      builder.getAssignedTask().setTask(taskConfiguration.newBuilder());
+      saveTasks(ImmutableSet.of(IScheduledTask.build(builder)));
+      return true;
+    }
+    return false;
+  }
+
+  private Function<TaskConfigRow, TaskConfig> getConfigSaturator() {
+    // It appears that there is no way in mybatis to populate a field of type Map.  To work around
+    // this, we need to manually perform the query and associate the elements.
+    final LoadingCache<Long, Map<String, String>> taskLinkCache = CacheBuilder.newBuilder()
+        .build(new CacheLoader<Long, Map<String, String>>() {
+          @Override
+          public Map<String, String> load(Long configId) {
+            return configManager.getTaskLinks(configId);
+          }
+        });
+    Function<TaskConfigRow, TaskConfig> linkPopulator = new Function<TaskConfigRow, TaskConfig>() {
+      @Override
+      public TaskConfig apply(TaskConfigRow row) {
+        return row.getConfig().setTaskLinks(taskLinkCache.getUnchecked(row.getId()));
+      }
+    };
+
+    return Functions.compose(REPLACE_UNION_TYPES, linkPopulator);
+  }
+
+  private FluentIterable<ScheduledTaskWrapper> fetchRows(Query.Builder query) {
+    final Function<TaskConfigRow, TaskConfig> configSaturator = getConfigSaturator();
+    return FluentIterable.from(taskMapper.select(query.get()))
+        .transform(populateAssignedPorts)
+        .transform(new Function<ScheduledTaskWrapper, ScheduledTaskWrapper>() {
+          @Override
+          public ScheduledTaskWrapper apply(ScheduledTaskWrapper task) {
+            configSaturator.apply(
+                new TaskConfigRow(
+                    task.getTaskConfigRowId(),
+                    task.getTask().getAssignedTask().getTask()));
+            return task;
+          }
+        });
+  }
+
+  private final Function<ScheduledTaskWrapper, ScheduledTaskWrapper> populateAssignedPorts =
+      new Function<ScheduledTaskWrapper, ScheduledTaskWrapper>() {
+        @Override
+        public ScheduledTaskWrapper apply(ScheduledTaskWrapper task) {
+          ImmutableMap.Builder<String, Integer> ports = ImmutableMap.builder();
+          for (AssignedPort port : taskMapper.selectPorts(task.getTaskRowId())) {
+            ports.put(port.getName(), port.getPort());
+          }
+          task.getTask().getAssignedTask().setAssignedPorts(ports.build());
+          return task;
+        }
+      };
+
+  private FluentIterable<IScheduledTask> matches(Query.Builder query) {
+    return fetchRows(query)
+        .transform(UNWRAP)
+        .transform(IScheduledTask.FROM_BUILDER);
+  }
+
+  private static final Function<ScheduledTaskWrapper, ScheduledTask> UNWRAP =
+      new Function<ScheduledTaskWrapper, ScheduledTask>() {
+        @Override
+        public ScheduledTask apply(ScheduledTaskWrapper task) {
+          return task.getTask();
+        }
+      };
+
+  /**
+   * Replaces the shimmed {@link org.apache.thrift.TUnion} instances with the base thrift types.
+   * This is necessary because TUnion, as of thrift 0.9.1, restricts subclassing.  The copy
+   * constructor checks for equality on {@link Object#getClass()} rather than the subclass-friendly
+   * {@link Class#isInstance(Object).
+   */
+  private static final Function<TaskConfig, TaskConfig> REPLACE_UNION_TYPES =
+      new Function<TaskConfig, TaskConfig>() {
+        @Override
+        public TaskConfig apply(TaskConfig config) {
+          ImmutableSet.Builder<Constraint> constraints = ImmutableSet.builder();
+          for (Constraint constraint : config.getConstraints()) {
+            Constraint replacement = new Constraint()
+                .setName(constraint.getName());
+            replacement.setConstraint(
+                new TaskConstraint(
+                    constraint.getConstraint().getSetField(),
+                    constraint.getConstraint().getFieldValue()));
+            constraints.add(replacement);
+          }
+          config.setConstraints(constraints.build());
+
+          config.setContainer(
+              new Container(
+                  config.getContainer().getSetField(),
+                  config.getContainer().getFieldValue()));
+
+          return config;
+        }
+      };
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java
index 7b4067c..fe8e3f8 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java
@@ -17,6 +17,8 @@ import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.util.Clock;
+import com.twitter.common.util.testing.FakeClock;
 
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
@@ -45,6 +47,7 @@ public final class DbUtil {
             FakeStatsProvider stats = new FakeStatsProvider();
             bind(StatsProvider.class).toInstance(stats);
             bind(FakeStatsProvider.class).toInstance(stats);
+            bind(Clock.class).toInstance(new FakeClock());
           }
         });
     Storage storage = injector.getInstance(Storage.class);

http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigManager.java b/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigManager.java
new file mode 100644
index 0000000..3ada628
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigManager.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.db;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.aurora.scheduler.storage.db.views.TaskConfigRow;
+import org.apache.aurora.scheduler.storage.db.views.TaskLink;
+import org.apache.aurora.scheduler.storage.entities.IConstraint;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.storage.entities.IValueConstraint;
+
+import static java.util.Objects.requireNonNull;
+
+class TaskConfigManager {
+  private final TaskConfigMapper configMapper;
+
+  @Inject
+  TaskConfigManager(TaskConfigMapper configMapper) {
+    this.configMapper = requireNonNull(configMapper);
+  }
+
+  long insert(ITaskConfig config) {
+    InsertResult configInsert = new InsertResult();
+    configMapper.insert(config, configInsert);
+    for (IConstraint constraint : config.getConstraints()) {
+      InsertResult constraintResult = new InsertResult();
+      configMapper.insertConstraint(configInsert.getId(), constraint, constraintResult);
+      switch (constraint.getConstraint().getSetField()) {
+        case VALUE:
+          IValueConstraint valueConstraint = constraint.getConstraint().getValue();
+          InsertResult valueResult = new InsertResult();
+          configMapper.insertValueConstraint(
+              constraintResult.getId(),
+              valueConstraint,
+              valueResult);
+          configMapper.insertValueConstraintValues(
+              valueResult.getId(),
+              valueConstraint.getValues());
+          break;
+
+        case LIMIT:
+          configMapper.insertLimitConstraint(
+              constraintResult.getId(),
+              constraint.getConstraint().getLimit());
+          break;
+
+        default:
+          throw new IllegalStateException(
+              "Unhandled constraint type " + constraint.getConstraint().getSetField());
+      }
+    }
+
+    if (!config.getRequestedPorts().isEmpty()) {
+      configMapper.insertRequestedPorts(configInsert.getId(), config.getRequestedPorts());
+    }
+
+    if (!config.getTaskLinks().isEmpty()) {
+      configMapper.insertTaskLinks(
+          configInsert.getId(),
+          FluentIterable.from(config.getTaskLinks().entrySet())
+              .transform(TO_LINK)
+              .toList());
+    }
+
+    if (!config.getMetadata().isEmpty()) {
+      configMapper.insertMetadata(configInsert.getId(), config.getMetadata());
+    }
+
+    // TODO(wfarner): It would be nice if this generalized to different Container types.
+    if (config.getContainer().isSetDocker()) {
+      configMapper.insertContainer(configInsert.getId(), config.getContainer().getDocker());
+    }
+
+    return configInsert.getId();
+  }
+
+  Map<String, String> getTaskLinks(long configId) {
+    ImmutableMap.Builder<String, String> links = ImmutableMap.builder();
+    for (TaskLink link : configMapper.selectTaskLinks(configId)) {
+      links.put(link.getLabel(), link.getUrl());
+    }
+    return links.build();
+  }
+
+  List<TaskConfigRow> getConfigs(IJobKey job) {
+    requireNonNull(job);
+    return configMapper.selectConfigsByJob(job);
+  }
+
+  List<Long> getTaskConfigIds(Set<String> scheduledTaskIds) {
+    requireNonNull(scheduledTaskIds);
+    return configMapper.selectConfigsByTaskId(scheduledTaskIds);
+  }
+
+  /**
+   * Performs reference counting on configurations.  If there are no longer any references to
+   * these configuration rows, they will be deleted.
+   * TODO(wfarner): Should we rely on foreign key constraints and opportunistically delete?
+   *
+   * @param configRowIds Configurations to delete if no references are found.
+   */
+  void maybeExpungeConfigs(Set<Long> configRowIds) {
+    if (configMapper.selectTasksByConfigId(configRowIds).isEmpty()) {
+      configMapper.delete(configRowIds);
+
+      // TODO(wfarner): Need to try removal from other tables as well, e.g. job keys.
+    }
+  }
+
+  private static final Function<Map.Entry<String, String>, TaskLink> TO_LINK =
+      new Function<Map.Entry<String, String>, TaskLink>() {
+        @Override
+        public TaskLink apply(Map.Entry<String, String> entry) {
+          return new TaskLink(entry.getKey(), entry.getValue());
+        }
+      };
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigMapper.java b/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigMapper.java
new file mode 100644
index 0000000..7ee001f
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigMapper.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.db;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.aurora.scheduler.storage.db.views.TaskConfigRow;
+import org.apache.aurora.scheduler.storage.db.views.TaskLink;
+import org.apache.aurora.scheduler.storage.entities.IConstraint;
+import org.apache.aurora.scheduler.storage.entities.IDockerContainer;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.ILimitConstraint;
+import org.apache.aurora.scheduler.storage.entities.IMetadata;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.storage.entities.IValueConstraint;
+import org.apache.ibatis.annotations.Param;
+
+/**
+ * MyBatis mapper for task config objects.
+ */
+interface TaskConfigMapper {
+
+  /**
+   * Inserts fields from a task config into the {@code task_configs} table.
+   *
+   * @param config Configuration to insert.
+   * @param result Container for auto-generated ID of the inserted row.
+   */
+  void insert(
+      @Param("config") ITaskConfig config,
+      @Param("result") InsertResult result);
+
+  /**
+   * Gets all task config rows referenced by a job.
+   *
+   * @param job Job to look up.
+   * @return Task config row container.
+   */
+  List<TaskConfigRow> selectConfigsByJob(IJobKey job);
+
+  /**
+   * Looks up task config IDs by task IDs.
+   *
+   * @param taskIds Task IDs to look up.
+   * @return Task config row IDs.
+   */
+  List<Long> selectConfigsByTaskId(@Param("taskIds") Set<String> taskIds);
+
+  /**
+   * Looks up task config IDs by id.
+   *
+   * @param configIds Task config IDs.
+   * @return Task config row IDs.
+   */
+  List<Long> selectTasksByConfigId(@Param("configIds") Set<Long> configIds);
+
+  /**
+   * Inserts the constraint association within an {@link ITaskConfig}.
+   *
+   * @param configId Task config ID.
+   * @param constraint Constraint to insert.
+   * @param result Container for auto-generated ID of the inserted row.
+   */
+  void insertConstraint(
+      @Param("configId") long configId,
+      @Param("constraint") IConstraint constraint,
+      @Param("result") InsertResult result);
+
+  /**
+   * Inserts the limit constraint association within an {@link IConstraint}.
+   *
+   * @param constraintId Constraint ID.
+   * @param constraint Constraint to insert.
+   */
+  void insertLimitConstraint(
+      @Param("constraintId") long constraintId,
+      @Param("constraint") ILimitConstraint constraint);
+
+  /**
+   * Inserts the value constraint association within an {@link IConstraint}.
+   *
+   * @param constraintId Constraint ID.
+   * @param constraint Constraint to insert.
+   * @param result Container for auto-generated ID of the inserted row.
+   */
+  void insertValueConstraint(
+      @Param("constraintId") long constraintId,
+      @Param("constraint") IValueConstraint constraint,
+      @Param("result") InsertResult result);
+
+  /**
+   * Inserts the values association within an {@link IValueConstraint}.
+   *
+   * @param valueConstraintId Value constraint ID.
+   * @param values Values to insert.
+   */
+  void insertValueConstraintValues(
+      @Param("valueConstraintId") long valueConstraintId,
+      @Param("values") Set<String> values);
+
+  /**
+   * Inserts the requested ports association within an {@link ITaskConfig}.
+   *
+   * @param configId Task config ID.
+   * @param ports Port names to insert.
+   */
+  void insertRequestedPorts(
+      @Param("configId") long configId,
+      @Param("ports") Set<String> ports);
+
+  /**
+   * Inserts the task links association within an {@link ITaskConfig}.
+   *
+   * @param configId Task config ID.
+   * @param links Task links to insert.
+   */
+  void insertTaskLinks(
+      @Param("configId") long configId,
+      @Param("links") List<TaskLink> links);
+
+  /**
+   * Selects the task links associated with a {@link ITaskConfig}.
+   *
+   * @param configId Task config ID.
+   * @return Links associated with the task config.
+   */
+  List<TaskLink> selectTaskLinks(@Param("configId") long configId);
+
+  /**
+   * Inserts the container association within an {@link ITaskConfig}.
+   *
+   * @param configId Task config ID.
+   * @param container Container to insert.
+   */
+  void insertContainer(
+      @Param("configId") long configId,
+      @Param("container") IDockerContainer container);
+
+  /**
+   * Inserts the metadata association within an {@link ITaskConfig}.
+   *
+   * @param configId Task config ID.
+   * @param metadata Metadata associated with the task config.
+   */
+  void insertMetadata(
+      @Param("configId") long configId,
+      @Param("metadata") Set<IMetadata> metadata);
+
+  /**
+   * Deletes task configs.
+   *
+   * @param configIds Configs to delete.
+   */
+  void delete(@Param("configIds") Set<Long> configIds);
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/TaskMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/TaskMapper.java b/src/main/java/org/apache/aurora/scheduler/storage/db/TaskMapper.java
new file mode 100644
index 0000000..9903675
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/TaskMapper.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.db;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.aurora.gen.TaskQuery;
+import org.apache.aurora.scheduler.storage.db.views.AssignedPort;
+import org.apache.aurora.scheduler.storage.db.views.ScheduledTaskWrapper;
+import org.apache.aurora.scheduler.storage.entities.ITaskEvent;
+import org.apache.ibatis.annotations.Param;
+
+/**
+ * MyBatis mapper for scheduled tasks.
+ */
+interface TaskMapper {
+
+  /**
+   * Inserts a scheduled task.
+   *
+   * @param task Task to insert.
+   */
+  void insertScheduledTask(ScheduledTaskWrapper task);
+
+  /**
+   * Gets tasks based on a query.
+   *
+   * @param query Query to use as a filter for tasks.
+   * @return Tasks matching the query.
+   */
+  List<ScheduledTaskWrapper> select(TaskQuery query);
+
+  /**
+   * Inserts the task events association within an
+   * {@link org.apache.aurora.scheduler.storage.entities.IScheduledTask}.
+   *
+   * @param taskRowId Task row ID.
+   * @param events Events to insert.
+   */
+  void insertTaskEvents(
+      @Param("taskRowId") long taskRowId,
+      @Param("events") List<ITaskEvent> events);
+
+  /**
+   * Inserts the assigned ports association within an
+   * {@link org.apache.aurora.scheduler.storage.entities.IScheduledTask}.
+   *
+   * @param taskRowId Task row ID.
+   * @param ports Assigned ports to insert.
+   */
+  void insertPorts(@Param("taskRowId") long taskRowId, @Param("ports") List<AssignedPort> ports);
+
+  /**
+   * Selects the assigned ports association within an
+   * {@link org.apache.aurora.scheduler.storage.entities.IScheduledTask}.
+   *
+   * @param taskRowId Task row ID.
+   * @return Ports associated with the task.
+   */
+  List<AssignedPort> selectPorts(@Param("taskRowId") long taskRowId);
+
+  /**
+   * Deletes all task rows.
+   */
+  void truncate();
+
+  /**
+   * Deletes task rows by ID.
+   *
+   * @param taskIds IDs of tasks to delete.
+   */
+  void deleteTasks(@Param("taskIds") Set<String> taskIds);
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/shims/ContainerShim.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/shims/ContainerShim.java b/src/main/java/org/apache/aurora/scheduler/storage/db/shims/ContainerShim.java
new file mode 100644
index 0000000..07a991d
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/shims/ContainerShim.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.db.shims;
+
+import org.apache.aurora.gen.Container;
+import org.apache.aurora.gen.DockerContainer;
+import org.apache.aurora.gen.MesosContainer;
+
+/**
+ * An extension of {@link Container} that does not throw {@link NullPointerException} when
+ * accessors are called on unset fields.
+ */
+public class ContainerShim extends Container {
+  @Override
+  public DockerContainer getDocker() {
+    if (isSet(_Fields.DOCKER)) {
+      return super.getDocker();
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public MesosContainer getMesos() {
+    if (isSet(_Fields.MESOS)) {
+      return super.getMesos();
+    } else {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/shims/TaskConstraintShim.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/shims/TaskConstraintShim.java b/src/main/java/org/apache/aurora/scheduler/storage/db/shims/TaskConstraintShim.java
new file mode 100644
index 0000000..4990af7
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/shims/TaskConstraintShim.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.db.shims;
+
+import org.apache.aurora.gen.LimitConstraint;
+import org.apache.aurora.gen.TaskConstraint;
+import org.apache.aurora.gen.ValueConstraint;
+
+/**
+ * An extension of {@link TaskConstraint} that does not throw {@link NullPointerException} when
+ * accessors are called on unset fields.
+ */
+public class TaskConstraintShim extends TaskConstraint {
+  @Override
+  public ValueConstraint getValue() {
+    if (isSet(_Fields.VALUE)) {
+      return super.getValue();
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public LimitConstraint getLimit() {
+    if (isSet(_Fields.LIMIT)) {
+      return super.getLimit();
+    } else {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/typehandlers/ScheduleStatusTypeHandler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/typehandlers/ScheduleStatusTypeHandler.java b/src/main/java/org/apache/aurora/scheduler/storage/db/typehandlers/ScheduleStatusTypeHandler.java
new file mode 100644
index 0000000..1f203b4
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/typehandlers/ScheduleStatusTypeHandler.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.db.typehandlers;
+
+import org.apache.aurora.gen.ScheduleStatus;
+
+/**
+ * Type handler for {@link ScheduleStatus}.
+ */
+class ScheduleStatusTypeHandler extends AbstractTEnumTypeHandler<ScheduleStatus> {
+  @Override
+  protected ScheduleStatus fromValue(int value) {
+    return ScheduleStatus.findByValue(value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/typehandlers/TypeHandlers.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/typehandlers/TypeHandlers.java b/src/main/java/org/apache/aurora/scheduler/storage/db/typehandlers/TypeHandlers.java
index 4d0c10d..0a519be 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/typehandlers/TypeHandlers.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/typehandlers/TypeHandlers.java
@@ -32,6 +32,7 @@ public final class TypeHandlers {
         JobUpdateActionTypeHandler.class,
         JobUpdateStatusTypeHandler.class,
         MaintenanceModeTypeHandler.class,
+        ScheduleStatusTypeHandler.class,
         TaskConfigTypeHandler.class);
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/views/AssignedPort.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/views/AssignedPort.java b/src/main/java/org/apache/aurora/scheduler/storage/db/views/AssignedPort.java
new file mode 100644
index 0000000..0c6442c
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/views/AssignedPort.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.db.views;
+
+/**
+ * Representation of a row in the task_ports table.
+ */
+public class AssignedPort {
+  private final String name;
+  private final int port;
+
+  private AssignedPort() {
+    // Needed by mybatis.
+    this(null, -1);
+  }
+
+  public AssignedPort(String name, int port) {
+    this.name = name;
+    this.port = port;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public int getPort() {
+    return port;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/views/ScheduledTaskWrapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/views/ScheduledTaskWrapper.java b/src/main/java/org/apache/aurora/scheduler/storage/db/views/ScheduledTaskWrapper.java
new file mode 100644
index 0000000..b89e7b5
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/views/ScheduledTaskWrapper.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.db.views;
+
+import org.apache.aurora.gen.ScheduledTask;
+
+/**
+ * Representation of a row in the tasks table.
+ */
+public class ScheduledTaskWrapper {
+  private final long taskRowId;
+  private final long taskConfigRowId;
+  private final ScheduledTask task;
+
+  private ScheduledTaskWrapper() {
+    // Needed by mybatis.
+    this(-1, -1, null);
+  }
+
+  public ScheduledTaskWrapper(long taskRowId, long taskConfigRowId, ScheduledTask task) {
+    this.taskRowId = taskRowId;
+    this.taskConfigRowId = taskConfigRowId;
+    this.task = task;
+  }
+
+  public long getTaskRowId() {
+    return taskRowId;
+  }
+
+  public long getTaskConfigRowId() {
+    return taskConfigRowId;
+  }
+
+  public ScheduledTask getTask() {
+    return task;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/views/TaskConfigRow.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/views/TaskConfigRow.java b/src/main/java/org/apache/aurora/scheduler/storage/db/views/TaskConfigRow.java
new file mode 100644
index 0000000..0160ae3
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/views/TaskConfigRow.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.db.views;
+
+import java.util.Objects;
+
+import org.apache.aurora.gen.TaskConfig;
+
+/**
+ * Representation of a row in the task_configs table.
+ */
+public class TaskConfigRow {
+  private final long id;
+  private final TaskConfig config;
+
+  private TaskConfigRow() {
+    // Required for mybatis.
+    this(-1, null);
+  }
+
+  public TaskConfigRow(long id, TaskConfig config) {
+    this.id = id;
+    this.config = config;
+  }
+
+  public long getId() {
+    return id;
+  }
+
+  public TaskConfig getConfig() {
+    return config;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof TaskConfigRow)) {
+      return false;
+    }
+
+    TaskConfigRow other = (TaskConfigRow) obj;
+    return Objects.equals(id, other.id)
+        && Objects.equals(config, other.config);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(id, config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/views/TaskLink.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/views/TaskLink.java b/src/main/java/org/apache/aurora/scheduler/storage/db/views/TaskLink.java
new file mode 100644
index 0000000..52b09a9
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/views/TaskLink.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.db.views;
+
+/**
+ * Representation of a row in the task_config_task_links table.
+ */
+public class TaskLink {
+  private final String label;
+  private final String url;
+
+  private TaskLink() {
+    // Needed by mybatis.
+    this(null, null);
+  }
+
+  public TaskLink(String label, String url) {
+    this.label = label;
+    this.url = url;
+  }
+
+  public String getLabel() {
+    return label;
+  }
+
+  public String getUrl() {
+    return url;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/mem/InMemStoresModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/InMemStoresModule.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/InMemStoresModule.java
index 21f7d4d..35c83b9 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/mem/InMemStoresModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/InMemStoresModule.java
@@ -17,6 +17,7 @@ import javax.inject.Singleton;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Key;
+import com.google.inject.PrivateModule;
 import com.twitter.common.inject.Bindings.KeyFactory;
 
 import org.apache.aurora.scheduler.storage.CronJobStore;
@@ -47,6 +48,30 @@ public final class InMemStoresModule extends AbstractModule {
   @Override
   protected void configure() {
     bindStore(CronJobStore.Mutable.class, MemJobStore.class);
-    bindStore(TaskStore.Mutable.class, MemTaskStore.class);
+  }
+
+  /**
+   * Binding module that installs the map-based task store implementation.
+   */
+  public static class TaskStoreModule extends PrivateModule {
+    private final KeyFactory keyFactory;
+
+    public TaskStoreModule(KeyFactory keyFactory) {
+      this.keyFactory = requireNonNull(keyFactory);
+    }
+
+    private <T> void bindStore(Class<T> binding, Class<? extends T> impl) {
+      bind(binding).to(impl);
+      bind(impl).in(Singleton.class);
+      Key<T> key = keyFactory.create(binding);
+      bind(key).to(impl);
+      expose(key);
+    }
+
+    @Override
+    protected void configure() {
+      bindStore(TaskStore.Mutable.class, MemTaskStore.class);
+      expose(TaskStore.Mutable.class);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml
----------------------------------------------------------------------
diff --git a/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml b/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml
index f76f9a9..cf31cf8 100644
--- a/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml
+++ b/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml
@@ -299,13 +299,11 @@
         AND j.name = #{jobKey.name}
         AND j.environment = #{jobKey.environment}
       </if>
-      <if test="updateStatuses != null">
-        <if test="updateStatuses.size() > 0">
-          AND (max_status.status IN
-          <foreach item="element" collection="updateStatuses" open="(" separator="," close="))">
-            #{element, typeHandler=org.apache.aurora.scheduler.storage.db.typehandlers.JobUpdateStatusTypeHandler}
-          </foreach>
-        </if>
+      <if test="updateStatuses != null and !updateStatuses.isEmpty()">
+        AND (max_status.status IN
+        <foreach item="element" collection="updateStatuses" open="(" separator="," close="))">
+          #{element, typeHandler=org.apache.aurora.scheduler.storage.db.typehandlers.JobUpdateStatusTypeHandler}
+        </foreach>
       </if>
     </if>
     ORDER BY max_ts.timestamp_ms DESC


Mime
View raw message