aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [4/5] aurora git commit: Exclusively use Map-based in-memory stores for primary storage
Date Wed, 25 Oct 2017 06:34:18 GMT
http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java
new file mode 100644
index 0000000..d190add
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java
@@ -0,0 +1,396 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aurora.scheduler.storage.mem;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Ordering;
+import com.google.common.primitives.Longs;
+
+import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.aurora.common.inject.TimedInterceptor.Timed;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.gen.JobInstanceUpdateEvent;
+import org.apache.aurora.gen.JobUpdateAction;
+import org.apache.aurora.gen.JobUpdateDetails;
+import org.apache.aurora.gen.JobUpdateEvent;
+import org.apache.aurora.gen.JobUpdateState;
+import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.LockKey;
+import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.LockStore;
+import org.apache.aurora.scheduler.storage.Storage.StorageException;
+import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
+import org.apache.aurora.scheduler.storage.entities.ILock;
+import org.apache.aurora.scheduler.storage.entities.ILockKey;
+
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.scheduler.storage.Util.jobUpdateActionStatName;
+import static org.apache.aurora.scheduler.storage.Util.jobUpdateStatusStatName;
+
+public class MemJobUpdateStore implements JobUpdateStore.Mutable {
+
+  private static final Ordering<IJobUpdateDetails> REVERSE_LAST_MODIFIED_ORDER = Ordering.natural()
+      .reverse()
+      .onResultOf(u -> u.getUpdate().getSummary().getState().getLastModifiedTimestampMs());
+
+  private final Map<IJobUpdateKey, UpdateAndLock> updates = Maps.newConcurrentMap();
+  private final LockStore lockStore;
+  private final LoadingCache<JobUpdateStatus, AtomicLong> jobUpdateEventStats;
+  private final LoadingCache<JobUpdateAction, AtomicLong> jobUpdateActionStats;
+
+  @Inject
+  public MemJobUpdateStore(LockStore.Mutable lockStore, StatsProvider statsProvider) {
+    this.lockStore = lockStore;
+    this.jobUpdateEventStats = CacheBuilder.newBuilder()
+        .build(new CacheLoader<JobUpdateStatus, AtomicLong>() {
+          @Override
+          public AtomicLong load(JobUpdateStatus status) {
+            return statsProvider.makeCounter(jobUpdateStatusStatName(status));
+          }
+        });
+    for (JobUpdateStatus status : JobUpdateStatus.values()) {
+      jobUpdateEventStats.getUnchecked(status).get();
+    }
+    this.jobUpdateActionStats = CacheBuilder.newBuilder()
+        .build(new CacheLoader<JobUpdateAction, AtomicLong>() {
+          @Override
+          public AtomicLong load(JobUpdateAction action) {
+            return statsProvider.makeCounter(jobUpdateActionStatName(action));
+          }
+        });
+    for (JobUpdateAction action : JobUpdateAction.values()) {
+      jobUpdateActionStats.getUnchecked(action).get();
+    }
+  }
+
+  @Timed("job_update_store_fetch_summaries")
+  @Override
+  public synchronized List<IJobUpdateSummary> fetchJobUpdateSummaries(IJobUpdateQuery query) {
+    return performQuery(query)
+        .map(u -> u.getUpdate().getSummary())
+        .collect(Collectors.toList());
+  }
+
+  @Timed("job_update_store_fetch_details_list")
+  @Override
+  public synchronized List<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateQuery query) {
+    return performQuery(query).collect(Collectors.toList());
+  }
+
+  @Timed("job_update_store_fetch_details")
+  @Override
+  public synchronized Optional<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateKey key) {
+    return Optional.fromNullable(updates.get(key)).transform(u -> u.details);
+  }
+
+  @Timed("job_update_store_fetch_update")
+  @Override
+  public synchronized Optional<IJobUpdate> fetchJobUpdate(IJobUpdateKey key) {
+    return Optional.fromNullable(updates.get(key)).transform(u -> u.details.getUpdate());
+  }
+
+  @Timed("job_update_store_fetch_instructions")
+  @Override
+  public synchronized Optional<IJobUpdateInstructions> fetchJobUpdateInstructions(
+      IJobUpdateKey key) {
+
+    return Optional.fromNullable(updates.get(key))
+        .transform(u -> u.details.getUpdate().getInstructions());
+  }
+
+  private void refreshLocks() {
+    // Simulate database behavior of join performed against locks, used to populate lockToken field.
+
+    ImmutableMap.Builder<IJobUpdateKey, UpdateAndLock> refreshed = ImmutableMap.builder();
+    for (Map.Entry<IJobUpdateKey, UpdateAndLock> entry : updates.entrySet()) {
+      IJobUpdateDetails update = entry.getValue().details;
+      Optional<String> updateLock = entry.getValue().lockToken;
+      if (updateLock.isPresent()) {
+        // Determine if token needs to be cleared to reflect lock store state.  The token may only
+        // remain if the lock store token exists and matches.
+        Optional<String> storedLock = Optional.fromNullable(lockStore.fetchLock(ILockKey.build(
+            LockKey.job(entry.getKey().getJob().newBuilder()))).map(ILock::getToken).orElse(null));
+        if (!storedLock.isPresent() || !updateLock.equals(storedLock)) {
+          refreshed.put(entry.getKey(), new UpdateAndLock(update, Optional.absent()));
+        }
+      }
+    }
+
+    updates.putAll(refreshed.build());
+  }
+
+  @Timed("job_update_store_fetch_all_details")
+  @Override
+  public synchronized Set<StoredJobUpdateDetails> fetchAllJobUpdateDetails() {
+    refreshLocks();
+    return updates.values().stream()
+        .map(u -> new StoredJobUpdateDetails(u.details.newBuilder(), u.lockToken.orNull()))
+        .collect(Collectors.toSet());
+  }
+
+  @Timed("job_update_store_fetch_instance_events")
+  @Override
+  public synchronized List<IJobInstanceUpdateEvent> fetchInstanceEvents(
+      IJobUpdateKey key,
+      int instanceId) {
+
+    return java.util.Optional.ofNullable(updates.get(key))
+        .map(u -> u.details.getInstanceEvents())
+        .orElse(ImmutableList.of())
+        .stream()
+        .filter(e -> e.getInstanceId() == instanceId)
+        .collect(Collectors.toList());
+  }
+
+  private static void validateInstructions(IJobUpdateInstructions instructions) {
+    if (!instructions.isSetDesiredState() && instructions.getInitialState().isEmpty()) {
+      throw new IllegalArgumentException(
+          "Missing both initial and desired states. At least one is required.");
+    }
+
+    if (!instructions.getInitialState().isEmpty()) {
+      if (instructions.getInitialState().stream().anyMatch(t -> t.getTask() == null)) {
+        throw new NullPointerException("Invalid initial instance state.");
+      }
+      Preconditions.checkArgument(
+          instructions.getInitialState().stream().noneMatch(t -> t.getInstances().isEmpty()),
+          "Invalid intial instance state ranges.");
+    }
+
+    if (instructions.getDesiredState() != null) {
+      MorePreconditions.checkNotBlank(instructions.getDesiredState().getInstances());
+      Preconditions.checkNotNull(instructions.getDesiredState().getTask());
+    }
+  }
+
+  @Timed("job_update_store_save_update")
+  @Override
+  public synchronized void saveJobUpdate(IJobUpdate update, Optional<String> lockToken) {
+    requireNonNull(update);
+    validateInstructions(update.getInstructions());
+
+    if (updates.containsKey(update.getSummary().getKey())) {
+      throw new StorageException("Update already exists: " + update.getSummary().getKey());
+    }
+
+    JobUpdateDetails mutable = new JobUpdateDetails()
+        .setUpdate(update.newBuilder())
+        .setUpdateEvents(ImmutableList.of())
+        .setInstanceEvents(ImmutableList.of());
+    mutable.getUpdate().getSummary().setState(synthesizeUpdateState(mutable));
+
+    updates.put(
+        update.getSummary().getKey(),
+        new UpdateAndLock(IJobUpdateDetails.build(mutable), lockToken));
+  }
+
+  private static final Ordering<JobUpdateEvent> EVENT_ORDERING = Ordering.natural()
+      .onResultOf(JobUpdateEvent::getTimestampMs);
+
+  @Timed("job_update_store_save_event")
+  @Override
+  public synchronized void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event) {
+    UpdateAndLock update = updates.get(key);
+    if (update == null) {
+      throw new StorageException("Update not found: " + key);
+    }
+
+    JobUpdateDetails mutable = update.details.newBuilder();
+    mutable.addToUpdateEvents(event.newBuilder());
+    mutable.setUpdateEvents(EVENT_ORDERING.sortedCopy(mutable.getUpdateEvents()));
+    mutable.getUpdate().getSummary().setState(synthesizeUpdateState(mutable));
+    updates.put(key, new UpdateAndLock(IJobUpdateDetails.build(mutable), update.lockToken));
+    jobUpdateEventStats.getUnchecked(event.getStatus()).incrementAndGet();
+  }
+
+  private static final Ordering<JobInstanceUpdateEvent> INSTANCE_EVENT_ORDERING = Ordering.natural()
+      .onResultOf(JobInstanceUpdateEvent::getTimestampMs);
+
+  @Timed("job_update_store_save_instance_event")
+  @Override
+  public synchronized void saveJobInstanceUpdateEvent(
+      IJobUpdateKey key,
+      IJobInstanceUpdateEvent event) {
+
+    UpdateAndLock update = updates.get(key);
+    if (update == null) {
+      throw new StorageException("Update not found: " + key);
+    }
+
+    JobUpdateDetails mutable = update.details.newBuilder();
+    mutable.addToInstanceEvents(event.newBuilder());
+    mutable.setInstanceEvents(INSTANCE_EVENT_ORDERING.sortedCopy(mutable.getInstanceEvents()));
+    mutable.getUpdate().getSummary().setState(synthesizeUpdateState(mutable));
+    updates.put(key, new UpdateAndLock(IJobUpdateDetails.build(mutable), update.lockToken));
+    jobUpdateActionStats.getUnchecked(event.getAction()).incrementAndGet();
+  }
+
+  @Timed("job_update_store_delete_all")
+  @Override
+  public synchronized void deleteAllUpdatesAndEvents() {
+    updates.clear();
+  }
+
+  @Timed("job_update_store_prune_history")
+  @Override
+  public synchronized Set<IJobUpdateKey> pruneHistory(
+      int perJobRetainCount,
+      long historyPruneThresholdMs) {
+
+    Supplier<Stream<IJobUpdateSummary>> completedUpdates = () -> updates.values().stream()
+        .map(u -> u.details.getUpdate().getSummary())
+        .filter(s -> TERMINAL_STATES.contains(s.getState().getStatus()));
+
+    Predicate<IJobUpdateSummary> expiredFilter =
+        s -> s.getState().getCreatedTimestampMs() < historyPruneThresholdMs;
+
+    ImmutableSet.Builder<IJobUpdateKey> pruneBuilder = ImmutableSet.builder();
+
+    // Gather updates based on time threshold.
+    pruneBuilder.addAll(completedUpdates.get()
+        .filter(expiredFilter)
+        .map(IJobUpdateSummary::getKey)
+        .collect(Collectors.toList()));
+
+    Multimap<IJobKey, IJobUpdateSummary> updatesByJob = Multimaps.index(
+        // Avoid counting to-be-removed expired updates.
+        completedUpdates.get().filter(expiredFilter.negate()).iterator(),
+        s -> s.getKey().getJob());
+
+    for (Map.Entry<IJobKey, Collection<IJobUpdateSummary>> entry
+        : updatesByJob.asMap().entrySet()) {
+
+      if (entry.getValue().size() > perJobRetainCount) {
+        Ordering<IJobUpdateSummary> creationOrder = Ordering.natural()
+            .onResultOf(s -> s.getState().getCreatedTimestampMs());
+        pruneBuilder.addAll(creationOrder
+            .leastOf(entry.getValue(), entry.getValue().size() - perJobRetainCount)
+            .stream()
+            .map(s -> s.getKey())
+            .iterator());
+      }
+    }
+
+    Set<IJobUpdateKey> pruned = pruneBuilder.build();
+    updates.keySet().removeAll(pruned);
+
+    return pruned;
+  }
+
+  private static JobUpdateState synthesizeUpdateState(JobUpdateDetails update) {
+    JobUpdateState state = update.getUpdate().getSummary().getState();
+    if (state == null) {
+      state = new JobUpdateState();
+    }
+
+    JobUpdateEvent firstEvent = Iterables.getFirst(update.getUpdateEvents(), null);
+    if (firstEvent != null) {
+      state.setCreatedTimestampMs(firstEvent.getTimestampMs());
+    }
+
+    JobUpdateEvent lastEvent = Iterables.getLast(update.getUpdateEvents(), null);
+    if (lastEvent != null) {
+      state.setStatus(lastEvent.getStatus());
+      state.setLastModifiedTimestampMs(lastEvent.getTimestampMs());
+    }
+
+    JobInstanceUpdateEvent lastInstanceEvent = Iterables.getLast(update.getInstanceEvents(), null);
+    if (lastInstanceEvent != null) {
+      state.setLastModifiedTimestampMs(
+          Longs.max(state.getLastModifiedTimestampMs(), lastInstanceEvent.getTimestampMs()));
+    }
+
+    return state;
+  }
+
+  private Stream<IJobUpdateDetails> performQuery(IJobUpdateQuery query) {
+    Predicate<IJobUpdateDetails> filter = u -> true;
+    if (query.getRole() != null) {
+      filter = filter.and(
+          u -> u.getUpdate().getSummary().getKey().getJob().getRole().equals(query.getRole()));
+    }
+    if (query.getKey() != null) {
+      filter = filter.and(u -> u.getUpdate().getSummary().getKey().equals(query.getKey()));
+    }
+    if (query.getJobKey() != null) {
+      filter = filter.and(
+          u -> u.getUpdate().getSummary().getKey().getJob().equals(query.getJobKey()));
+    }
+    if (query.getUser() != null) {
+      filter = filter.and(u -> u.getUpdate().getSummary().getUser().equals(query.getUser()));
+    }
+    if (query.getUpdateStatuses() != null && !query.getUpdateStatuses().isEmpty()) {
+      filter = filter.and(u -> query.getUpdateStatuses()
+          .contains(u.getUpdate().getSummary().getState().getStatus()));
+    }
+
+    // TODO(wfarner): Modification time is not a stable ordering for pagination, but we use it as
+    // such here.  The behavior is carried over from DbJobupdateStore; determine if it is desired.
+    Stream<IJobUpdateDetails> matches = updates.values().stream()
+        .map(u -> u.details)
+        .filter(filter)
+        .sorted(REVERSE_LAST_MODIFIED_ORDER)
+        .skip(query.getOffset());
+
+    if (query.getLimit() > 0) {
+      matches = matches.limit(query.getLimit());
+    }
+
+    return matches;
+  }
+
+  private static final class UpdateAndLock {
+    private final IJobUpdateDetails details;
+    private final Optional<String> lockToken;
+
+    UpdateAndLock(IJobUpdateDetails details, Optional<String> lockToken) {
+      this.details = details;
+      this.lockToken = lockToken;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/main/java/org/apache/aurora/scheduler/storage/mem/MemLockStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemLockStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemLockStore.java
new file mode 100644
index 0000000..4c7bda8
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemLockStore.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.mem;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
+import org.apache.aurora.scheduler.storage.LockStore;
+import org.apache.aurora.scheduler.storage.Storage.StorageException;
+import org.apache.aurora.scheduler.storage.entities.ILock;
+import org.apache.aurora.scheduler.storage.entities.ILockKey;
+
+/**
+ * An in-memory lock store.
+ */
+class MemLockStore implements LockStore.Mutable {
+
+  private final Map<ILockKey, ILock> locks = Maps.newConcurrentMap();
+
+  @Override
+  public void saveLock(ILock lock) {
+    // TODO(wfarner): Re-evaluate, this is not idempotent.
+    if (locks.containsKey(lock.getKey())) {
+      throw new StorageException("Duplicate lock key");
+    }
+    if (FluentIterable.from(locks.values())
+        .transform(ILock::getToken)
+        .anyMatch(Predicates.equalTo(lock.getToken()))) {
+
+      throw new StorageException("Duplicate token");
+    }
+
+    locks.put(lock.getKey(), lock);
+  }
+
+  @Override
+  public void removeLock(ILockKey lockKey) {
+    locks.remove(lockKey);
+  }
+
+  @Override
+  public void deleteLocks() {
+    locks.clear();
+  }
+
+  @Override
+  public Set<ILock> fetchLocks() {
+    return ImmutableSet.copyOf(locks.values());
+  }
+
+  @Override
+  public Optional<ILock> fetchLock(ILockKey lockKey) {
+    return Optional.ofNullable(locks.get(lockKey));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/main/java/org/apache/aurora/scheduler/storage/mem/MemQuotaStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemQuotaStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemQuotaStore.java
new file mode 100644
index 0000000..61b9eee
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemQuotaStore.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.mem;
+
+import java.util.Map;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+import org.apache.aurora.scheduler.storage.QuotaStore;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+
+/**
+ * An in-memory quota store.
+ */
+class MemQuotaStore implements QuotaStore.Mutable {
+
+  private final Map<String, IResourceAggregate> quotas = Maps.newConcurrentMap();
+
+  @Override
+  public void deleteQuotas() {
+    quotas.clear();
+  }
+
+  @Override
+  public void removeQuota(String role) {
+    quotas.remove(role);
+  }
+
+  @Override
+  public void saveQuota(String role, IResourceAggregate quota) {
+    quotas.put(role, quota);
+  }
+
+  @Override
+  public Optional<IResourceAggregate> fetchQuota(String role) {
+    return Optional.fromNullable(quotas.get(role));
+  }
+
+  @Override
+  public Map<String, IResourceAggregate> fetchQuotas() {
+    return ImmutableMap.copyOf(quotas);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/main/java/org/apache/aurora/scheduler/storage/mem/MemSchedulerStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemSchedulerStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemSchedulerStore.java
new file mode 100644
index 0000000..8a7ddd7
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemSchedulerStore.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.mem;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Atomics;
+
+import org.apache.aurora.scheduler.storage.SchedulerStore;
+
+/**
+ * An in-memory scheduler store.
+ */
+class MemSchedulerStore implements SchedulerStore.Mutable {
+  private final AtomicReference<String> frameworkId = Atomics.newReference();
+
+  @Override
+  public void saveFrameworkId(String newFrameworkId) {
+    frameworkId.set(newFrameworkId);
+  }
+
+  @Override
+  public Optional<String> fetchFrameworkId() {
+    return Optional.fromNullable(frameworkId.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
new file mode 100644
index 0000000..7ace104
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.mem;
+
+import javax.inject.Inject;
+
+import org.apache.aurora.common.inject.TimedInterceptor.Timed;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.CronJobStore;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.LockStore;
+import org.apache.aurora.scheduler.storage.QuotaStore;
+import org.apache.aurora.scheduler.storage.SchedulerStore;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.TaskStore;
+
+/**
+ * A storage implementation comprised of individual in-memory store implementations.
+ */
+public class MemStorage implements Storage {
+  private final MutableStoreProvider storeProvider;
+
+  @Inject
+  MemStorage(
+      @Volatile final SchedulerStore.Mutable schedulerStore,
+      @Volatile final CronJobStore.Mutable jobStore,
+      @Volatile final TaskStore.Mutable taskStore,
+      @Volatile final LockStore.Mutable lockStore,
+      @Volatile final QuotaStore.Mutable quotaStore,
+      @Volatile final AttributeStore.Mutable attributeStore,
+      @Volatile final JobUpdateStore.Mutable updateStore) {
+
+    storeProvider = new MutableStoreProvider() {
+      @Override
+      public SchedulerStore.Mutable getSchedulerStore() {
+        return schedulerStore;
+      }
+
+      @Override
+      public CronJobStore.Mutable getCronJobStore() {
+        return jobStore;
+      }
+
+      @Override
+      public TaskStore getTaskStore() {
+        return taskStore;
+      }
+
+      @Override
+      public TaskStore.Mutable getUnsafeTaskStore() {
+        return taskStore;
+      }
+
+      @Override
+      public LockStore.Mutable getLockStore() {
+        return lockStore;
+      }
+
+      @Override
+      public QuotaStore.Mutable getQuotaStore() {
+        return quotaStore;
+      }
+
+      @Override
+      public AttributeStore.Mutable getAttributeStore() {
+        return attributeStore;
+      }
+
+      @Override
+      public JobUpdateStore.Mutable getJobUpdateStore() {
+        return updateStore;
+      }
+    };
+  }
+
+  @Timed("mem_storage_read_operation")
+  @Override
+  public <T, E extends Exception> T read(final Work<T, E> work) throws StorageException, E {
+    return work.apply(storeProvider);
+  }
+
+  @Timed("mem_storage_write_operation")
+  @Override
+  public <T, E extends Exception> T write(final MutateWork<T, E> work) throws StorageException, E {
+    return work.apply(storeProvider);
+  }
+
+  @Override
+  public void prepare() {
+    // No-op.
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java
new file mode 100644
index 0000000..2ad84eb
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.mem;
+
+import javax.inject.Singleton;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.PrivateModule;
+import com.google.inject.TypeLiteral;
+
+import org.apache.aurora.common.inject.Bindings.KeyFactory;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.CronJobStore;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.LockStore;
+import org.apache.aurora.scheduler.storage.QuotaStore;
+import org.apache.aurora.scheduler.storage.SchedulerStore;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.Volatile;
+import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.mem.MemTaskStore.SlowQueryThreshold;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Binding module for in-memory stores.
+ * <p>
+ * NOTE: These stores are being phased out in favor of database-backed stores.
+ */
+public final class MemStorageModule extends PrivateModule {
+
+  private final KeyFactory keyFactory;
+
+  public MemStorageModule() {
+    this(KeyFactory.PLAIN);
+  }
+
+  public MemStorageModule(KeyFactory keyFactory) {
+    this.keyFactory = requireNonNull(keyFactory);
+  }
+
+  private <T> void bindStore(Class<T> binding, Class<? extends T> impl) {
+    bind(binding).to(impl);
+    bind(impl).in(Singleton.class);
+    Key<T> key = Key.get(binding, Volatile.class);
+    bind(key).to(impl);
+    expose(key);
+    expose(binding);
+  }
+
+  @Override
+  protected void configure() {
+    bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(SlowQueryThreshold.class)
+        .toInstance(Amount.of(25L, Time.MILLISECONDS));
+    bindStore(TaskStore.Mutable.class, MemTaskStore.class);
+    bindStore(CronJobStore.Mutable.class, MemCronJobStore.class);
+    bindStore(AttributeStore.Mutable.class, MemAttributeStore.class);
+    bindStore(LockStore.Mutable.class, MemLockStore.class);
+    bindStore(QuotaStore.Mutable.class, MemQuotaStore.class);
+    bindStore(SchedulerStore.Mutable.class, MemSchedulerStore.class);
+    bindStore(JobUpdateStore.Mutable.class, MemJobUpdateStore.class);
+
+    Key<Storage> storageKey = keyFactory.create(Storage.class);
+    bind(storageKey).to(MemStorage.class);
+    bind(MemStorage.class).in(Singleton.class);
+    expose(storageKey);
+  }
+
+  /**
+   * Creates a new empty in-memory storage for use in testing.
+   */
+  @VisibleForTesting
+  public static Storage newEmptyStorage() {
+    Injector injector = Guice.createInjector(
+        new MemStorageModule(),
+        new AbstractModule() {
+          @Override
+          protected void configure() {
+            bind(StatsProvider.class).to(FakeStatsProvider.class);
+            bind(FakeStatsProvider.class).in(Singleton.class);
+          }
+        });
+
+    Storage storage = injector.getInstance(Storage.class);
+    storage.prepare();
+    return storage;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/main/java/org/apache/aurora/scheduler/storage/mem/Util.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/Util.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/Util.java
deleted file mode 100644
index c28fb65..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/mem/Util.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.mem;
-
-import com.google.common.base.Function;
-
-import org.apache.thrift.TBase;
-
-/**
- * Utility class for common operations amongst in-memory store implementations.
- */
-final class Util {
-
-  private Util() {
-    // Utility class.
-  }
-
-  /**
-   * Creates a function that performs a 'deep copy' on a thrift struct of a specific type.  The
-   * resulting copied objects will be exactly identical to the original.  Mutations to the original
-   * object will not be reflected in the copy, and vice versa.
-   *
-   * @return A copier for the provided type of thrift structs.
-   */
-  static <T extends TBase<T, ?>> Function<T, T> deepCopier() {
-    return input -> {
-      if (input == null) {
-        return null;
-      }
-
-      @SuppressWarnings("unchecked")
-      T t = (T) input.deepCopy();
-      return t;
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index 3d35e97..27c0b43 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -77,8 +77,6 @@ import static com.google.common.base.Preconditions.checkState;
 
 import static org.apache.aurora.gen.JobUpdateStatus.ABORTED;
 import static org.apache.aurora.gen.JobUpdateStatus.ERROR;
-import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_BACK;
-import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_FORWARD;
 import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_BACK;
 import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD;
 import static org.apache.aurora.gen.JobUpdateStatus.ROLL_FORWARD_AWAITING_PULSE;
@@ -470,14 +468,6 @@ class JobUpdateControllerImpl implements JobUpdateController {
     changeJobUpdateStatus(storeProvider, key, event, true);
   }
 
-  private static final Set<JobUpdateStatus> TERMINAL_STATES = ImmutableSet.of(
-      ROLLED_FORWARD,
-      ROLLED_BACK,
-      ABORTED,
-      JobUpdateStatus.FAILED,
-      ERROR
-  );
-
   private void changeJobUpdateStatus(
       MutableStoreProvider storeProvider,
       IJobUpdateKey key,
@@ -494,7 +484,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
           IJobUpdateEvent.build(proposedEvent.setTimestampMs(clock.nowMillis()).setStatus(status)));
     }
 
-    if (TERMINAL_STATES.contains(status)) {
+    if (JobUpdateStore.TERMINAL_STATES.contains(status)) {
       lockManager.releaseLock(key.getJob());
       pulseHandler.remove(key);
     } else {

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index efbc42c..67a0d5a 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -218,7 +218,7 @@ public class SchedulerIT extends BaseZooKeeperTest {
         ImmutableList.<Module>builder()
             .add(SchedulerMain.getUniversalModule(new CliOptions()))
             .add(new TierModule(TaskTestUtil.TIER_CONFIG))
-            .add(new LogStorageModule(new LogStorageModule.Options(), false))
+            .add(new LogStorageModule(new LogStorageModule.Options()))
             .add(new ServiceDiscoveryModule(zkClientConfig, SERVERSET_PATH))
             .add(testModule)
             .build()

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java b/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
index 3dee55a..c639ab6 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
@@ -15,18 +15,15 @@ package org.apache.aurora.scheduler.app.local;
 
 import java.io.File;
 import java.util.List;
-import java.util.Set;
 
 import javax.inject.Singleton;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.io.Files;
 import com.google.inject.AbstractModule;
 import com.google.inject.Key;
 import com.google.inject.Module;
-import com.google.inject.TypeLiteral;
 import com.google.inject.util.Modules;
 
 import org.apache.aurora.scheduler.TierModule;
@@ -40,8 +37,6 @@ import org.apache.aurora.scheduler.mesos.FrameworkInfoFactory;
 import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
-import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl;
-import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl.HydrateSnapshotFields;
 import org.apache.mesos.SchedulerDriver;
 import org.apache.mesos.v1.Protos;
 import org.apache.shiro.io.ResourceUtils;
@@ -79,16 +74,12 @@ public final class LocalSchedulerMain {
         .add("-shiro_ini_path="
             + ResourceUtils.CLASSPATH_PREFIX
             + "org/apache/aurora/scheduler/http/api/security/shiro-example.ini")
-        .add("-enable_h2_console=true")
         .build();
     CliOptions options = CommandLine.parseOptions(arguments.toArray(new String[] {}));
 
     Module persistentStorage = new AbstractModule() {
       @Override
       protected void configure() {
-        bind(new TypeLiteral<Boolean>() { })
-            .annotatedWith(SnapshotStoreImpl.ExperimentalTaskStore.class)
-            .toInstance(false);
         bind(Storage.class).to(Key.get(Storage.class, Storage.Volatile.class));
         bind(NonVolatileStorage.class).to(FakeNonVolatileStorage.class);
         bind(DistributedSnapshotStore.class).toInstance(snapshot -> { });
@@ -108,8 +99,6 @@ public final class LocalSchedulerMain {
         bind(FrameworkInfoFactory.class).to(FrameworkInfoFactory.FrameworkInfoFactoryImpl.class);
         bind(FrameworkInfoFactory.FrameworkInfoFactoryImpl.class).in(Singleton.class);
         install(new ClusterSimulatorModule());
-        bind(new TypeLiteral<Set<String>>() { }).annotatedWith(HydrateSnapshotFields.class)
-            .toInstance(ImmutableSet.of());
       }
     };
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
index f7c945d..244422c 100644
--- a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
@@ -29,7 +29,6 @@ import com.google.common.base.Predicates;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.inject.AbstractModule;
@@ -170,17 +169,9 @@ public class CommandLineTest {
     expected.updater.enableAffinity = true;
     expected.updater.affinityExpiration = TEST_TIME;
     expected.state.taskAssignerModules = ImmutableList.of(NoopModule.class);
-    expected.db.useDbTaskStore = true;
-    expected.db.enableDbMetrics = false;
-    expected.db.slowQueryLogThreshold = TEST_TIME;
-    expected.db.dbRowGcInterval = TEST_TIME;
-    expected.db.h2LockTimeout = TEST_TIME;
-    expected.db.mybatisMaxActiveConnectionCount = 42;
-    expected.db.mybatisMaxIdleConnectionCount = 42;
     expected.logStorage.shutdownGracePeriod = TEST_TIME;
     expected.logStorage.snapshotInterval = TEST_TIME;
     expected.logStorage.maxLogEntrySize = TEST_DATA;
-    expected.logStorage.hydrateSnapshotFields = ImmutableSet.of("testing");
     expected.backup.backupInterval = TEST_TIME;
     expected.backup.maxSavedBackups = 42;
     expected.backup.backupDir = new File("testing");
@@ -225,7 +216,6 @@ public class CommandLineTest {
     expected.iniShiroRealm.shiroIniPath = testIni;
     expected.iniShiroRealm.shiroCredentialsMatcher = AllowAllCredentialsMatcher.class;
     expected.api.enableCorsFor = "testing";
-    expected.h2Console.enableH2Console = true;
     expected.preemptor.enablePreemptor = false;
     expected.preemptor.preemptionDelay = TEST_TIME;
     expected.preemptor.preemptionSlotHoldTime = TEST_TIME;
@@ -323,17 +313,9 @@ public class CommandLineTest {
         "-enable_update_affinity=true",
         "-update_affinity_reservation_hold_time=42days",
         "-task_assigner_modules=org.apache.aurora.scheduler.config.CommandLineTest$NoopModule",
-        "-use_beta_db_task_store=true",
-        "-enable_db_metrics=false",
-        "-slow_query_log_threshold=42days",
-        "-db_row_gc_interval=42days",
-        "-db_lock_timeout=42days",
-        "-db_max_active_connection_count=42",
-        "-db_max_idle_connection_count=42",
         "-dlog_shutdown_grace_period=42days",
         "-dlog_snapshot_interval=42days",
         "-dlog_max_entry_size=42GB",
-        "-snapshot_hydrate_stores=testing",
         "-backup_interval=42days",
         "-max_saved_backups=42",
         "-backup_dir=testing",
@@ -366,7 +348,6 @@ public class CommandLineTest {
         "-shiro_credentials_matcher="
             + "org.apache.shiro.authc.credential.AllowAllCredentialsMatcher",
         "-enable_cors_for=testing",
-        "-enable_h2_console=true",
         "-enable_preemptor=false",
         "-preemption_delay=42days",
         "-preemption_slot_hold_time=42days",

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
index 2b81707..742a97c 100644
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
@@ -33,8 +33,8 @@ import org.apache.aurora.scheduler.state.StateChangeResult;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
-import org.apache.aurora.scheduler.storage.db.DbUtil;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
 import org.easymock.Capture;
 import org.junit.Before;
 import org.junit.Test;
@@ -65,7 +65,7 @@ public class AuroraCronJobTest extends EasyMockTest {
 
   @Before
   public void setUp() throws Exception {
-    storage = DbUtil.createStorage();
+    storage = MemStorageModule.newEmptyStorage();
     stateManager = createMock(StateManager.class);
     backoffHelper = createMock(BackoffHelper.class);
     context = createMock(JobExecutionContext.class);
@@ -101,11 +101,11 @@ public class AuroraCronJobTest extends EasyMockTest {
     populateStorage(CronCollisionPolicy.CANCEL_NEW);
     auroraCronJob.doExecute(context);
 
-    storage = DbUtil.createStorage();
+    storage = MemStorageModule.newEmptyStorage();
     populateStorage(CronCollisionPolicy.KILL_EXISTING);
     auroraCronJob.doExecute(context);
 
-    storage = DbUtil.createStorage();
+    storage = MemStorageModule.newEmptyStorage();
     populateStorage(CronCollisionPolicy.RUN_OVERLAP);
     auroraCronJob.doExecute(context);
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
index 1ed6a7b..0fabb33 100644
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
@@ -39,9 +39,9 @@ import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
-import org.apache.aurora.scheduler.storage.db.DbUtil;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
 import org.junit.Before;
 import org.junit.Test;
 import org.quartz.JobExecutionContext;
@@ -79,7 +79,7 @@ public class CronIT extends EasyMockTest {
   @Before
   public void setUp() throws Exception {
     stateManager = createMock(StateManager.class);
-    storage = DbUtil.createStorage();
+    storage = MemStorageModule.newEmptyStorage();
     auroraCronJob = createMock(AuroraCronJob.class);
 
     injector = Guice.createInjector(

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
index 81440f5..439cf3e 100644
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
@@ -31,9 +31,9 @@ import org.apache.aurora.scheduler.cron.CrontabEntry;
 import org.apache.aurora.scheduler.cron.SanitizedCronJob;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
-import org.apache.aurora.scheduler.storage.db.DbUtil;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
 import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
@@ -59,7 +59,7 @@ public class CronJobManagerImplTest extends EasyMockTest {
 
   @Before
   public void setUp() {
-    storage = DbUtil.createStorage();
+    storage = MemStorageModule.newEmptyStorage();
     scheduler = createMock(Scheduler.class);
 
     cronJobManager = new CronJobManagerImpl(storage, scheduler, TimeZone.getTimeZone("GMT"));

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/http/MaintenanceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/http/MaintenanceTest.java b/src/test/java/org/apache/aurora/scheduler/http/MaintenanceTest.java
index f94b58b..cb037bd 100644
--- a/src/test/java/org/apache/aurora/scheduler/http/MaintenanceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/http/MaintenanceTest.java
@@ -26,9 +26,9 @@ import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
-import org.apache.aurora.scheduler.storage.db.DbUtil;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -44,7 +44,7 @@ public class MaintenanceTest {
 
   @Before
   public void setUp() {
-    storage = DbUtil.createStorage();
+    storage = MemStorageModule.newEmptyStorage();
     maintenance = new Maintenance(storage);
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
index af6bdff..7a4525a 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
@@ -51,9 +51,9 @@ import org.apache.aurora.scheduler.state.PubsubTestUtil;
 import org.apache.aurora.scheduler.state.TaskAssigner;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
-import org.apache.aurora.scheduler.storage.db.DbUtil;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.easymock.EasyMock;
@@ -310,7 +310,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
   @Test
   public void testIgnoresThrottledTasks() throws Exception {
     // Ensures that tasks in THROTTLED state are not considered part of the active job state.
-    Storage memStorage = DbUtil.createStorage();
+    Storage memStorage = MemStorageModule.newEmptyStorage();
 
     Injector injector = getInjector(memStorage);
     scheduler = injector.getInstance(TaskScheduler.class);

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
index c56b6f9..8e19794 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
@@ -23,9 +23,9 @@ import org.apache.aurora.gen.Lock;
 import org.apache.aurora.gen.LockKey;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.state.LockManager.LockException;
-import org.apache.aurora.scheduler.storage.db.DbUtil;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.ILock;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -55,7 +55,7 @@ public class LockManagerImplTest extends EasyMockTest {
     UUIDGenerator tokenGenerator = createMock(UUIDGenerator.class);
     expect(tokenGenerator.createNew()).andReturn(TOKEN).anyTimes();
 
-    lockManager = new LockManagerImpl(DbUtil.createStorage(), clock, tokenGenerator);
+    lockManager = new LockManagerImpl(MemStorageModule.newEmptyStorage(), clock, tokenGenerator);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
index 24176fe..0366cd6 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
@@ -51,11 +51,11 @@ import org.apache.aurora.scheduler.scheduling.RescheduleCalculator;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
-import org.apache.aurora.scheduler.storage.db.DbUtil;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
 import org.apache.mesos.v1.Protos.AgentID;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
@@ -111,7 +111,7 @@ public class StateManagerImplTest extends EasyMockTest {
     eventSink = createMock(EventSink.class);
     rescheduleCalculator = createMock(RescheduleCalculator.class);
     // TODO(William Farner): Use a mocked storage.
-    storage = DbUtil.createStorage();
+    storage = MemStorageModule.newEmptyStorage();
     stateManager = new StateManagerImpl(
         clock,
         driver,

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java b/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java
index d73656d..04350fa 100644
--- a/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java
@@ -35,10 +35,10 @@ import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.resources.ResourceTestUtil;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
-import org.apache.aurora.scheduler.storage.db.DbUtil;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -74,7 +74,7 @@ public class ResourceCounterTest {
 
   @Before
   public void setUp() throws Exception {
-    storage = DbUtil.createStorage();
+    storage = MemStorageModule.newEmptyStorage();
     resourceCounter = new ResourceCounter(storage);
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/storage/AbstractAttributeStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/AbstractAttributeStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/AbstractAttributeStoreTest.java
new file mode 100644
index 0000000..34db54b
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/AbstractAttributeStoreTest.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage;
+
+import java.io.IOException;
+import java.util.Set;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.MaintenanceMode.DRAINED;
+import static org.junit.Assert.assertEquals;
+
+public abstract class AbstractAttributeStoreTest {
+
+  private static final String HOST_A = "hostA";
+  private static final String HOST_B = "hostB";
+  private static final String SLAVE_A = "slaveA";
+  private static final String SLAVE_B = "slaveB";
+  private static final Attribute ATTR1 = new Attribute("attr1", ImmutableSet.of("a", "b", "c"));
+  private static final Attribute ATTR2 = new Attribute("attr2", ImmutableSet.of("d", "e", "f"));
+  private static final Attribute ATTR3 = new Attribute("attr3", ImmutableSet.of("a", "d", "g"));
+  private static final IHostAttributes HOST_A_ATTRS =
+      IHostAttributes.build(new HostAttributes(HOST_A, ImmutableSet.of(ATTR1, ATTR2))
+          .setSlaveId(SLAVE_A)
+          .setAttributes(ImmutableSet.of())
+          .setMode(MaintenanceMode.NONE));
+  private static final IHostAttributes HOST_B_ATTRS =
+      IHostAttributes.build(new HostAttributes(HOST_B, ImmutableSet.of(ATTR2, ATTR3))
+          .setSlaveId(SLAVE_B)
+          .setAttributes(ImmutableSet.of())
+          .setMode(MaintenanceMode.DRAINING));
+
+  private Storage storage;
+
+  @Before
+  public void setUp() throws IOException {
+    storage = createStorage();
+  }
+
+  protected abstract Storage createStorage();
+
+  @Test
+  public void testCrud() {
+    assertEquals(Optional.absent(), read(HOST_A));
+    assertEquals(ImmutableSet.of(), readAll());
+
+    insert(HOST_A_ATTRS);
+    assertEquals(Optional.of(HOST_A_ATTRS), read(HOST_A));
+    assertEquals(ImmutableSet.of(HOST_A_ATTRS), readAll());
+
+    insert(HOST_B_ATTRS);
+    insert(HOST_B_ATTRS);  // Double insert should be allowed.
+    assertEquals(Optional.of(HOST_B_ATTRS), read(HOST_B));
+    assertEquals(ImmutableSet.of(HOST_A_ATTRS, HOST_B_ATTRS), readAll());
+
+    IHostAttributes updatedA = IHostAttributes.build(
+        HOST_A_ATTRS.newBuilder().setAttributes(ImmutableSet.of(ATTR1, ATTR3)));
+    insert(updatedA);
+    assertEquals(Optional.of(updatedA), read(HOST_A));
+    assertEquals(ImmutableSet.of(updatedA, HOST_B_ATTRS), readAll());
+
+    IHostAttributes updatedMode = IHostAttributes.build(updatedA.newBuilder().setMode(DRAINED));
+    insert(updatedMode);
+    assertEquals(Optional.of(updatedMode), read(HOST_A));
+    assertEquals(ImmutableSet.of(updatedMode, HOST_B_ATTRS), readAll());
+
+    truncate();
+    assertEquals(Optional.absent(), read(HOST_A));
+    assertEquals(ImmutableSet.of(), readAll());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testEmptyAttributeValues() {
+    IHostAttributes attributes = IHostAttributes.build(HOST_A_ATTRS.newBuilder()
+        .setAttributes(ImmutableSet.of(new Attribute("attr1", ImmutableSet.of()))));
+    insert(attributes);
+  }
+
+  @Test
+  public void testNoAttributes() {
+    IHostAttributes attributes = IHostAttributes.build(
+        HOST_A_ATTRS.newBuilder().setAttributes(ImmutableSet.of()));
+    insert(attributes);
+    assertEquals(Optional.of(attributes), read(HOST_A));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testNoMode() {
+    HostAttributes noMode = HOST_A_ATTRS.newBuilder();
+    noMode.unsetMode();
+
+    insert(IHostAttributes.build(noMode));
+  }
+
+  @Test
+  public void testSaveAttributesEmpty() {
+    HostAttributes attributes = HOST_A_ATTRS.newBuilder();
+    attributes.unsetAttributes();
+
+    insert(IHostAttributes.build(attributes));
+    assertEquals(Optional.of(IHostAttributes.build(attributes)), read(HOST_A));
+  }
+
+  @Test
+  public void testSlaveIdChanges() {
+    insert(HOST_A_ATTRS);
+    IHostAttributes updated = IHostAttributes.build(HOST_A_ATTRS.newBuilder().setSlaveId(SLAVE_B));
+    insert(updated);
+    assertEquals(Optional.of(updated), read(HOST_A));
+  }
+
+  @Test
+  public void testUpdateAttributesWithRelations() {
+    // Test for regression of AURORA-1379, where host attribute mutation performed a delete,
+    // violating foreign key constraints.
+    insert(HOST_A_ATTRS);
+
+    ScheduledTask builder = TaskTestUtil.makeTask("a", JobKeys.from("role", "env", "job"))
+        .newBuilder();
+    builder.getAssignedTask()
+        .setSlaveHost(HOST_A_ATTRS.getHost())
+        .setSlaveId(HOST_A_ATTRS.getSlaveId());
+    final IScheduledTask taskA = IScheduledTask.build(builder);
+
+    storage.write((NoResult.Quiet)
+        storeProvider -> storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(taskA)));
+
+    HostAttributes attributeBuilder = HOST_A_ATTRS.newBuilder().setMode(DRAINED);
+    attributeBuilder.addToAttributes(new Attribute("newAttr", ImmutableSet.of("a", "b")));
+    IHostAttributes hostAUpdated = IHostAttributes.build(attributeBuilder);
+    insert(hostAUpdated);
+    assertEquals(Optional.of(hostAUpdated), read(HOST_A));
+  }
+
+  private void insert(IHostAttributes attributes) {
+    storage.write(
+        storeProvider -> storeProvider.getAttributeStore().saveHostAttributes(attributes));
+  }
+
+  private Optional<IHostAttributes> read(String host) {
+    return storage.read(storeProvider -> storeProvider.getAttributeStore().getHostAttributes(host));
+  }
+
+  private Set<IHostAttributes> readAll() {
+    return storage.read(storeProvider -> storeProvider.getAttributeStore().getHostAttributes());
+  }
+
+  private void truncate() {
+    storage.write(
+        (NoResult.Quiet) storeProvider -> storeProvider.getAttributeStore().deleteHostAttributes());
+  }
+}


Mime
View raw message