aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject git commit: Reworked LogStorage to avoid unnecessary extra calls to write().
Date Tue, 25 Feb 2014 20:36:05 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 03d8d81e8 -> c07b799f1


Reworked LogStorage to avoid unnecessary extra calls to write().

Bugs closed: AURORA-134

Reviewed at https://reviews.apache.org/r/18241/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/c07b799f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/c07b799f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/c07b799f

Branch: refs/heads/master
Commit: c07b799f1a79178667cafc0d1e0930aa9250ee66
Parents: 03d8d81
Author: Bill Farner <wfarner@apache.org>
Authored: Tue Feb 25 12:32:34 2014 -0800
Committer: Bill Farner <wfarner@apache.org>
Committed: Tue Feb 25 12:32:34 2014 -0800

----------------------------------------------------------------------
 .../scheduler/storage/log/LogStorage.java       | 231 ++++++++-----------
 .../storage/testing/StorageTestUtil.java        |   6 +-
 .../scheduler/storage/log/LogStorageTest.java   | 168 ++++++++------
 3 files changed, 193 insertions(+), 212 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c07b799f/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
index 36e4082..fd1be9f 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
@@ -34,6 +34,7 @@ import javax.inject.Inject;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
@@ -354,9 +355,14 @@ public class LogStorage extends ForwardingStore
         break;
 
       case TRANSACTION:
-        for (Op op : logEntry.getTransaction().getOps()) {
-          replayOp(op);
-        }
+        write(new MutateWork.NoResult.Quiet() {
+          @Override
+          protected void execute(MutableStoreProvider unused) {
+            for (Op op : logEntry.getTransaction().getOps()) {
+              replayOp(op);
+            }
+          }
+        });
         break;
 
       case NOOP:
@@ -489,12 +495,7 @@ public class LogStorage extends ForwardingStore
     // The log stream transaction has already been set up so we just need to delegate with
our
     // store provider so any mutations performed by work get logged.
     if (transaction != null) {
-      return super.write(new MutateWork<T, E>() {
-        @Override
-        public T apply(MutableStoreProvider unused) throws E {
-          return work.apply(logStoreProvider);
-        }
-      });
+      return work.apply(logStoreProvider);
     }
 
     transaction = streamManager.startTransaction();
@@ -523,25 +524,20 @@ public class LogStorage extends ForwardingStore
   @Timed("scheduler_log_save_framework_id")
   @Override
   public void saveFrameworkId(final String frameworkId) {
-    write(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider unused) {
-        log(Op.saveFrameworkId(new SaveFrameworkId(frameworkId)));
-        LogStorage.super.saveFrameworkId(frameworkId);
-      }
-    });
+    checkNotNull(frameworkId);
+
+    log(Op.saveFrameworkId(new SaveFrameworkId(frameworkId)));
+    super.saveFrameworkId(frameworkId);
   }
 
   @Timed("scheduler_log_job_save")
   @Override
   public void saveAcceptedJob(final String managerId, final IJobConfiguration jobConfig)
{
-    write(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider unused) {
-        log(Op.saveAcceptedJob(new SaveAcceptedJob(managerId, jobConfig.newBuilder())));
-        LogStorage.super.saveAcceptedJob(managerId, jobConfig);
-      }
-    });
+    checkNotNull(managerId);
+    checkNotNull(jobConfig);
+
+    log(Op.saveAcceptedJob(new SaveAcceptedJob(managerId, jobConfig.newBuilder())));
+    super.saveAcceptedJob(managerId, jobConfig);
   }
 
   @Timed("scheduler_log_job_remove")
@@ -549,51 +545,35 @@ public class LogStorage extends ForwardingStore
   public void removeJob(final IJobKey jobKey) {
     checkNotNull(jobKey);
 
-    write(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider unused) {
-        log(Op.removeJob(new RemoveJob().setJobKey(jobKey.newBuilder())));
-        LogStorage.super.removeJob(jobKey);
-      }
-    });
+    log(Op.removeJob(new RemoveJob().setJobKey(jobKey.newBuilder())));
+    super.removeJob(jobKey);
   }
 
   @Timed("scheduler_log_tasks_save")
   @Override
   public void saveTasks(final Set<IScheduledTask> newTasks) throws IllegalStateException
{
-    write(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider unused) {
-        log(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(newTasks))));
-        LogStorage.super.saveTasks(newTasks);
-      }
-    });
+    checkNotNull(newTasks);
+
+    log(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(newTasks))));
+    super.saveTasks(newTasks);
   }
 
   @Override
   public void deleteAllTasks() {
-    write(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider storeProvider) {
-        Query.Builder query = Query.unscoped();
-        Set<String> ids = FluentIterable.from(storeProvider.getTaskStore().fetchTasks(query))
-            .transform(Tasks.SCHEDULED_TO_ID)
-            .toSet();
-        deleteTasks(ids);
-      }
-    });
+    Query.Builder query = Query.unscoped();
+    Set<String> ids = FluentIterable.from(fetchTasks(query))
+        .transform(Tasks.SCHEDULED_TO_ID)
+        .toSet();
+    deleteTasks(ids);
   }
 
   @Timed("scheduler_log_tasks_remove")
   @Override
   public void deleteTasks(final Set<String> taskIds) {
-    write(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider unused) {
-        log(Op.removeTasks(new RemoveTasks(taskIds)));
-        LogStorage.super.deleteTasks(taskIds);
-      }
-    });
+    checkNotNull(taskIds);
+
+    log(Op.removeTasks(new RemoveTasks(taskIds)));
+    super.deleteTasks(taskIds);
   }
 
   @Timed("scheduler_log_tasks_mutate")
@@ -602,121 +582,102 @@ public class LogStorage extends ForwardingStore
       final Query.Builder query,
       final Function<IScheduledTask, IScheduledTask> mutator) {
 
-    return write(new MutateWork.Quiet<ImmutableSet<IScheduledTask>>() {
-      @Override
-      public ImmutableSet<IScheduledTask> apply(MutableStoreProvider unused) {
-        ImmutableSet<IScheduledTask> mutated = LogStorage.super.mutateTasks(query,
mutator);
+    checkNotNull(query);
+    checkNotNull(mutator);
 
-        Map<String, IScheduledTask> tasksById = Tasks.mapById(mutated);
-        if (LOG.isLoggable(Level.FINE)) {
-          LOG.fine("Storing updated tasks to log: "
-              + Maps.transformValues(tasksById, Tasks.GET_STATUS));
-        }
+    ImmutableSet<IScheduledTask> mutated = super.mutateTasks(query, mutator);
 
-        // TODO(William Farner): Avoid writing an op when mutated is empty.
-        log(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(mutated))));
-        return mutated;
-      }
-    });
+    Map<String, IScheduledTask> tasksById = Tasks.mapById(mutated);
+    if (LOG.isLoggable(Level.FINE)) {
+      LOG.fine("Storing updated tasks to log: "
+          + Maps.transformValues(tasksById, Tasks.GET_STATUS));
+    }
+
+    // TODO(William Farner): Avoid writing an op when mutated is empty.
+    log(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(mutated))));
+    return mutated;
   }
 
   @Timed("scheduler_log_unsafe_modify_in_place")
   @Override
   public boolean unsafeModifyInPlace(final String taskId, final ITaskConfig taskConfiguration)
{
-    return write(new MutateWork.Quiet<Boolean>() {
-      @Override
-      public Boolean apply(MutableStoreProvider storeProvider) {
-        boolean mutated = LogStorage.super.unsafeModifyInPlace(taskId, taskConfiguration);
-        if (mutated) {
-          log(Op.rewriteTask(new RewriteTask(taskId, taskConfiguration.newBuilder())));
-        }
-        return mutated;
-      }
-    });
+    checkNotNull(taskId);
+    checkNotNull(taskConfiguration);
+
+    boolean mutated = super.unsafeModifyInPlace(taskId, taskConfiguration);
+    if (mutated) {
+      log(Op.rewriteTask(new RewriteTask(taskId, taskConfiguration.newBuilder())));
+    }
+    return mutated;
   }
 
   @Timed("scheduler_log_quota_remove")
   @Override
   public void removeQuota(final String role) {
-    write(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider unused) {
-        log(Op.removeQuota(new RemoveQuota(role)));
-        LogStorage.super.removeQuota(role);
-      }
-    });
+    checkNotNull(role);
+
+    log(Op.removeQuota(new RemoveQuota(role)));
+    super.removeQuota(role);
   }
 
   @Timed("scheduler_log_quota_save")
   @Override
   public void saveQuota(final String role, final IQuota quota) {
-    write(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider unused) {
-        log(Op.saveQuota(new SaveQuota(role, quota.newBuilder())));
-        LogStorage.super.saveQuota(role, quota);
-      }
-    });
+    checkNotNull(role);
+    checkNotNull(quota);
+
+    log(Op.saveQuota(new SaveQuota(role, quota.newBuilder())));
+    super.saveQuota(role, quota);
   }
 
   @Timed("scheduler_save_host_attribute")
   @Override
   public void saveHostAttributes(final HostAttributes attrs) {
-    write(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider unused) {
-        // Pass the updated attributes upstream, and then check if the stored value changes.
-        // We do this since different parts of the system write partial HostAttributes objects
-        // and they are merged together internally.
-        // TODO(William Farner): Split out a separate method
-        //                       saveAttributes(String host, Iterable<Attributes>)
to simplify this.
-        Optional<HostAttributes> saved = LogStorage.super.getHostAttributes(attrs.getHost());
-        LogStorage.super.saveHostAttributes(attrs);
-        Optional<HostAttributes> updated = LogStorage.super.getHostAttributes(attrs.getHost());
-        if (!saved.equals(updated)) {
-          log(Op.saveHostAttributes(new SaveHostAttributes(updated.get())));
-        }
-      }
-    });
+    checkNotNull(attrs);
+
+    // Pass the updated attributes upstream, and then check if the stored value changes.
+    // We do this since different parts of the system write partial HostAttributes objects
+    // and they are merged together internally.
+    // TODO(William Farner): Split out a separate method
+    //                       saveAttributes(String host, Iterable<Attributes>) to simplify
this.
+    Optional<HostAttributes> saved = getHostAttributes(attrs.getHost());
+    super.saveHostAttributes(attrs);
+    Optional<HostAttributes> updated = getHostAttributes(attrs.getHost());
+    if (!saved.equals(updated)) {
+      log(Op.saveHostAttributes(new SaveHostAttributes(updated.get())));
+    }
   }
 
   @Timed("scheduler_lock_save")
   @Override
   public void saveLock(final ILock lock) {
-    write(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider unused) {
-        log(Op.saveLock(new SaveLock(lock.newBuilder())));
-        LogStorage.super.saveLock(lock);
-      }
-    });
+    checkNotNull(lock);
+
+    log(Op.saveLock(new SaveLock(lock.newBuilder())));
+    super.saveLock(lock);
   }
 
   @Timed("scheduler_lock_remove")
   @Override
   public void removeLock(final ILockKey lockKey) {
-    write(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider unused) {
-        log(Op.removeLock(new RemoveLock(lockKey.newBuilder())));
-        LogStorage.super.removeLock(lockKey);
-      }
-    });
+    checkNotNull(lockKey);
+
+    log(Op.removeLock(new RemoveLock(lockKey.newBuilder())));
+    super.removeLock(lockKey);
   }
 
   @Override
   public boolean setMaintenanceMode(final String host, final MaintenanceMode mode) {
-    write(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider unused) {
-        Optional<HostAttributes> saved = LogStorage.super.getHostAttributes(host);
-        if (saved.isPresent()) {
-          HostAttributes attributes = saved.get().setMode(mode);
-          log(Op.saveHostAttributes(new SaveHostAttributes(attributes)));
-          LogStorage.super.saveHostAttributes(attributes);
-        }
-      }
-    });
+    checkNotNull(host);
+    checkNotNull(mode);
+
+    Optional<HostAttributes> saved = getHostAttributes(host);
+    if (saved.isPresent()) {
+      HostAttributes attributes = saved.get().setMode(mode);
+      log(Op.saveHostAttributes(new SaveHostAttributes(attributes)));
+      super.saveHostAttributes(attributes);
+      return true;
+    }
     return false;
   }
 
@@ -735,6 +696,10 @@ public class LogStorage extends ForwardingStore
   }
 
   private void log(Op op) {
+    Preconditions.checkState(
+        !recovered || (transaction != null),
+        "Mutating operations must be done during recovery or within a transaction.");
+
     if (recovered) {
       transaction.add(op);
     }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c07b799f/src/main/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
b/src/main/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
index 005bad2..043dae6 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
@@ -74,7 +74,7 @@ public class StorageTestUtil {
     this.storage = easyMock.createMock(NonVolatileStorage.class);
   }
 
-  private <T> IExpectationSetters<T> expectConsistentRead() {
+  public <T> IExpectationSetters<T> expectConsistentRead() {
     final Capture<Work<T, RuntimeException>> work = EasyMockTest.createCapture();
     return expect(storage.consistentRead(capture(work))).andAnswer(new IAnswer<T>()
{
       @Override
@@ -84,7 +84,7 @@ public class StorageTestUtil {
     });
   }
 
-  private <T> IExpectationSetters<T> expectWeaklyConsistentRead() {
+  public <T> IExpectationSetters<T> expectWeaklyConsistentRead() {
     final Capture<Work<T, RuntimeException>> work = EasyMockTest.createCapture();
     return expect(storage.weaklyConsistentRead(capture(work))).andAnswer(new IAnswer<T>()
{
       @Override
@@ -94,7 +94,7 @@ public class StorageTestUtil {
     });
   }
 
-  private <T> IExpectationSetters<T> expectWriteOperation() {
+  public <T> IExpectationSetters<T> expectWriteOperation() {
     final Capture<MutateWork<T, RuntimeException>> work = EasyMockTest.createCapture();
     return expect(storage.write(capture(work))).andAnswer(new IAnswer<T>() {
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c07b799f/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
index f708ab3..1f02e84 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
@@ -70,6 +70,7 @@ import org.apache.aurora.scheduler.log.Log;
 import org.apache.aurora.scheduler.log.Log.Entry;
 import org.apache.aurora.scheduler.log.Log.Position;
 import org.apache.aurora.scheduler.log.Log.Stream;
+import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.SnapshotStore;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
@@ -244,10 +245,10 @@ public class LogStorageTest extends EasyMockTest {
     shutdownStream.getValue().execute();
   }
 
-  abstract class MutationFixture {
+  abstract class StorageTestFixture {
     private final AtomicBoolean runCalled = new AtomicBoolean(false);
 
-    MutationFixture() {
+    StorageTestFixture() {
       // Prevent otherwise silent noop tests that forget to call run().
       addTearDown(new TearDown() {
         @Override
@@ -306,11 +307,28 @@ public class LogStorageTest extends EasyMockTest {
       logStorage.start(initializationLogic);
 
       // Exercise the system.
-      performMutations();
+      runTest();
     }
 
-    protected abstract void setupExpectations() throws Exception;
-    protected abstract void performMutations();
+    protected void setupExpectations() throws Exception {
+      // Default to no expectations.
+    }
+
+    protected abstract void runTest();
+  }
+
+  abstract class MutationFixture extends StorageTestFixture {
+    @Override
+    protected void runTest() {
+      logStorage.write(new MutateWork.NoResult.Quiet() {
+        @Override
+        protected void execute(MutableStoreProvider storeProvider) {
+          performMutations(storeProvider);
+        }
+      });
+    }
+
+    protected abstract void performMutations(MutableStoreProvider storeProvider);
   }
 
   @Test
@@ -319,15 +337,15 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws CodingException {
-        storageUtil.expectOperations();
+        storageUtil.expectWriteOperation();
         storageUtil.schedulerStore.saveFrameworkId(frameworkId);
         streamMatcher.expectTransaction(Op.saveFrameworkId(new SaveFrameworkId(frameworkId)))
             .andReturn(position);
       }
 
       @Override
-      protected void performMutations() {
-        logStorage.saveFrameworkId(frameworkId);
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getSchedulerStore().saveFrameworkId(frameworkId);
       }
     }.run();
   }
@@ -340,7 +358,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectOperations();
+        storageUtil.expectWriteOperation();
         storageUtil.jobStore.saveAcceptedJob(managerId, jobConfig);
         streamMatcher.expectTransaction(
             Op.saveAcceptedJob(new SaveAcceptedJob(managerId, jobConfig.newBuilder())))
@@ -348,8 +366,8 @@ public class LogStorageTest extends EasyMockTest {
       }
 
       @Override
-      protected void performMutations() {
-        logStorage.saveAcceptedJob(managerId, jobConfig);
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getJobStore().saveAcceptedJob(managerId, jobConfig);
       }
     }.run();
   }
@@ -360,7 +378,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectOperations();
+        storageUtil.expectWriteOperation();
         storageUtil.jobStore.removeJob(jobKey);
         streamMatcher.expectTransaction(
             Op.removeJob(new RemoveJob().setJobKey(jobKey.newBuilder())))
@@ -368,8 +386,8 @@ public class LogStorageTest extends EasyMockTest {
       }
 
       @Override
-      protected void performMutations() {
-        logStorage.removeJob(jobKey);
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getJobStore().removeJob(jobKey);
       }
     }.run();
   }
@@ -380,7 +398,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectOperations();
+        storageUtil.expectWriteOperation();
         storageUtil.taskStore.saveTasks(tasks);
         streamMatcher.expectTransaction(
             Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(tasks))))
@@ -388,8 +406,8 @@ public class LogStorageTest extends EasyMockTest {
       }
 
       @Override
-      protected void performMutations() {
-        logStorage.saveTasks(tasks);
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().saveTasks(tasks);
       }
     }.run();
   }
@@ -403,7 +421,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectOperations();
+        storageUtil.expectWriteOperation();
         expect(storageUtil.taskStore.mutateTasks(query, mutation)).andReturn(mutated);
         streamMatcher.expectTransaction(
             Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(mutated))))
@@ -411,8 +429,8 @@ public class LogStorageTest extends EasyMockTest {
       }
 
       @Override
-      protected void performMutations() {
-        assertEquals(mutated, logStorage.mutateTasks(query, mutation));
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTasks(query, mutation));
       }
     }.run();
   }
@@ -426,7 +444,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectOperations();
+        storageUtil.expectWriteOperation();
         expect(storageUtil.taskStore.unsafeModifyInPlace(taskId2, updatedConfig)).andReturn(false);
         expect(storageUtil.taskStore.unsafeModifyInPlace(taskId, updatedConfig)).andReturn(true);
         streamMatcher.expectTransaction(
@@ -435,9 +453,9 @@ public class LogStorageTest extends EasyMockTest {
       }
 
       @Override
-      protected void performMutations() {
-        logStorage.unsafeModifyInPlace(taskId2, updatedConfig);
-        logStorage.unsafeModifyInPlace(taskId, updatedConfig);
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().unsafeModifyInPlace(taskId2, updatedConfig);
+        storeProvider.getUnsafeTaskStore().unsafeModifyInPlace(taskId, updatedConfig);
       }
     }.run();
   }
@@ -453,7 +471,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectOperations();
+        storageUtil.expectWriteOperation();
         expect(storageUtil.taskStore.mutateTasks(query, mutation)).andReturn(mutated);
 
         storageUtil.taskStore.deleteTasks(tasksToRemove);
@@ -465,17 +483,13 @@ public class LogStorageTest extends EasyMockTest {
       }
 
       @Override
-      protected void performMutations() {
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTasks(query, mutation));
+
         logStorage.write(new MutateWork.NoResult.Quiet() {
           @Override
-          protected void execute(MutableStoreProvider provider) {
-            assertEquals(mutated, provider.getUnsafeTaskStore().mutateTasks(query, mutation));
-            logStorage.write(new MutateWork.NoResult.Quiet() {
-              @Override
-              protected void execute(MutableStoreProvider innerProvider) {
-                innerProvider.getUnsafeTaskStore().deleteTasks(tasksToRemove);
-              }
-            });
+          protected void execute(MutableStoreProvider innerProvider) {
+            innerProvider.getUnsafeTaskStore().deleteTasks(tasksToRemove);
           }
         });
       }
@@ -492,7 +506,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectOperations();
+        storageUtil.expectWriteOperation();
         storageUtil.taskStore.saveTasks(saved);
 
         // Nested transaction with result.
@@ -505,14 +519,9 @@ public class LogStorageTest extends EasyMockTest {
       }
 
       @Override
-      protected void performMutations() {
-        logStorage.write(new MutateWork.NoResult.Quiet() {
-          @Override
-          protected void execute(MutableStoreProvider provider) {
-            logStorage.saveTasks(saved);
-            assertEquals(mutated, logStorage.mutateTasks(query, mutation));
-          }
-        });
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().saveTasks(saved);
+        assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTasks(query, mutation));
       }
     }.run();
   }
@@ -527,7 +536,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectOperations();
+        storageUtil.expectWriteOperation();
         storageUtil.taskStore.saveTasks(saved);
 
         // Nested transaction with result.
@@ -544,14 +553,9 @@ public class LogStorageTest extends EasyMockTest {
       }
 
       @Override
-      protected void performMutations() {
-        logStorage.write(new MutateWork.NoResult.Quiet() {
-          @Override
-          protected void execute(MutableStoreProvider provider) {
-            logStorage.saveTasks(saved);
-            assertEquals(mutated, logStorage.mutateTasks(query, mutation));
-          }
-        });
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().saveTasks(saved);
+        assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTasks(query, mutation));
       }
     }.run();
   }
@@ -563,7 +567,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectOperations();
+        storageUtil.expectWriteOperation();
         expect(storageUtil.taskStore.fetchTasks(Query.unscoped())).andReturn(ImmutableSet.of(task));
         storageUtil.taskStore.deleteTasks(taskIds);
         streamMatcher.expectTransaction(Op.removeTasks(new RemoveTasks(taskIds)))
@@ -571,8 +575,8 @@ public class LogStorageTest extends EasyMockTest {
       }
 
       @Override
-      protected void performMutations() {
-        logStorage.deleteAllTasks();
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().deleteAllTasks();
       }
     }.run();
   }
@@ -583,15 +587,15 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectOperations();
+        storageUtil.expectWriteOperation();
         storageUtil.taskStore.deleteTasks(taskIds);
         streamMatcher.expectTransaction(Op.removeTasks(new RemoveTasks(taskIds)))
             .andReturn(position);
       }
 
       @Override
-      protected void performMutations() {
-        logStorage.deleteTasks(taskIds);
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().deleteTasks(taskIds);
       }
     }.run();
   }
@@ -603,15 +607,15 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectOperations();
+        storageUtil.expectWriteOperation();
         storageUtil.quotaStore.saveQuota(role, quota);
         streamMatcher.expectTransaction(Op.saveQuota(new SaveQuota(role, quota.newBuilder())))
             .andReturn(position);
       }
 
       @Override
-      protected void performMutations() {
-        logStorage.saveQuota(role, quota);
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getQuotaStore().saveQuota(role, quota);
       }
     }.run();
   }
@@ -622,14 +626,14 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectOperations();
+        storageUtil.expectWriteOperation();
         storageUtil.quotaStore.removeQuota(role);
         streamMatcher.expectTransaction(Op.removeQuota(new RemoveQuota(role))).andReturn(position);
       }
 
       @Override
-      protected void performMutations() {
-        logStorage.removeQuota(role);
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getQuotaStore().removeQuota(role);
       }
     }.run();
   }
@@ -644,15 +648,15 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectOperations();
+        storageUtil.expectWriteOperation();
         storageUtil.lockStore.saveLock(lock);
         streamMatcher.expectTransaction(Op.saveLock(new SaveLock(lock.newBuilder())))
             .andReturn(position);
       }
 
       @Override
-      protected void performMutations() {
-        logStorage.saveLock(lock);
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getLockStore().saveLock(lock);
       }
     }.run();
   }
@@ -664,15 +668,15 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectOperations();
+        storageUtil.expectWriteOperation();
         storageUtil.lockStore.removeLock(lockKey);
         streamMatcher.expectTransaction(Op.removeLock(new RemoveLock(lockKey.newBuilder())))
             .andReturn(position);
       }
 
       @Override
-      protected void performMutations() {
-        logStorage.removeLock(lockKey);
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getLockStore().removeLock(lockKey);
       }
     }.run();
   }
@@ -689,7 +693,7 @@ public class LogStorageTest extends EasyMockTest {
     new MutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
-        storageUtil.expectOperations();
+        storageUtil.expectWriteOperation();
         expect(storageUtil.attributeStore.getHostAttributes(host))
             .andReturn(Optional.<HostAttributes>absent());
 
@@ -708,11 +712,23 @@ public class LogStorageTest extends EasyMockTest {
       }
 
       @Override
-      protected void performMutations() {
-        logStorage.saveHostAttributes(hostAttributes.get());
-        assertEquals(hostAttributes, logStorage.getHostAttributes(host));
-        logStorage.saveHostAttributes(hostAttributes.get());
-        assertEquals(hostAttributes, logStorage.getHostAttributes(host));
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        AttributeStore.Mutable store = storeProvider.getAttributeStore();
+        store.saveHostAttributes(hostAttributes.get());
+        assertEquals(hostAttributes, store.getHostAttributes(host));
+        store.saveHostAttributes(hostAttributes.get());
+        assertEquals(hostAttributes, store.getHostAttributes(host));
+      }
+    }.run();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testMutateRequiresWriteOperation() throws Exception {
+    new StorageTestFixture() {
+
+      @Override
+      protected void runTest() {
+        logStorage.deleteTasks(ImmutableSet.of("a"));
       }
     }.run();
   }


Mime
View raw message