aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [1/5] aurora git commit: Exclusively use Map-based in-memory stores for primary storage
Date Wed, 25 Oct 2017 06:34:15 GMT
Repository: aurora
Updated Branches:
  refs/heads/master 38476abdf -> f2755e1cd


http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/storage/mem/MemQuotaStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemQuotaStoreTest.java
b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemQuotaStoreTest.java
new file mode 100644
index 0000000..e8324ee
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemQuotaStoreTest.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.mem;
+
+import org.apache.aurora.scheduler.storage.AbstractQuotaStoreTest;
+import org.apache.aurora.scheduler.storage.Storage;
+
+public class MemQuotaStoreTest extends AbstractQuotaStoreTest {
+  @Override
+  protected Storage createStorage() {
+    return MemStorageModule.newEmptyStorage();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageTest.java
new file mode 100644
index 0000000..7a12dfa
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageTest.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.mem;
+
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.aurora.common.testing.TearDownTestCase;
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.Storage.Work;
+import org.apache.aurora.scheduler.storage.Storage.Work.Quiet;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * TODO(William Farner): Wire a mechanism to allow verification of synchronized writers.
+ */
+public class MemStorageTest extends TearDownTestCase {
+
+  private ExecutorService executor;
+  private Storage storage;
+
+  @Before
+  public void setUp() {
+    executor = Executors.newCachedThreadPool(
+        new ThreadFactoryBuilder().setNameFormat("SlowRead-%d").setDaemon(true).build());
+    addTearDown(() -> executor.shutdown());
+    storage = MemStorageModule.newEmptyStorage();
+  }
+
+  @Test
+  public void testConcurrentReaders() throws Exception {
+    // Validate that a slow read does not block another read.
+
+    final CountDownLatch slowReadStarted = new CountDownLatch(1);
+    final CountDownLatch slowReadFinished = new CountDownLatch(1);
+
+    Future<String> future = executor.submit(() -> storage.read((Quiet<String>)
storeProvider -> {
+      slowReadStarted.countDown();
+      try {
+        slowReadFinished.await();
+      } catch (InterruptedException e) {
+        fail(e.getMessage());
+      }
+      return "slowResult";
+    }));
+
+    slowReadStarted.await();
+
+    String fastResult = storage.read((Quiet<String>) storeProvider -> "fastResult");
+    assertEquals("fastResult", fastResult);
+    slowReadFinished.countDown();
+    assertEquals("slowResult", future.get());
+  }
+
+  private IScheduledTask makeTask(String taskId) {
+    return IScheduledTask.build(new ScheduledTask().setAssignedTask(
+        new AssignedTask()
+            .setTaskId(taskId)
+            .setTask(new TaskConfig()
+                .setOwner(new Identity().setUser("owner-" + taskId))
+                .setJob(new JobKey()
+                    .setRole("role-" + taskId)
+                    .setEnvironment("env-" + taskId)
+                    .setName("job-" + taskId)))));
+  }
+
+  private static class CustomException extends RuntimeException {
+  }
+
+  private <T, E extends RuntimeException> void expectWriteFail(MutateWork<T, E>
work) {
+    try {
+      storage.write(work);
+      fail("Expected a CustomException.");
+    } catch (CustomException e) {
+      // Expected.
+    }
+  }
+
+  private void expectTasks(final String... taskIds) {
+    storage.read((Work.Quiet<Void>) storeProvider -> {
+      Query.Builder query = Query.unscoped();
+      Set<String> ids = FluentIterable.from(storeProvider.getTaskStore().fetchTasks(query))
+          .transform(t -> t.getAssignedTask().getTaskId())
+          .toSet();
+      assertEquals(ImmutableSet.<String>builder().add(taskIds).build(), ids);
+      return null;
+    });
+  }
+
+  @Test
+  public void testOperations() {
+    expectWriteFail((MutateWork.NoResult.Quiet) storeProvider -> {
+      storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("a"), makeTask("b")));
+      throw new CustomException();
+    });
+    expectTasks("a", "b");
+
+    storage.write((MutateWork.NoResult.Quiet) storeProvider -> {
+      storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("a"), makeTask("b")));
+    });
+    expectTasks("a", "b");
+
+    expectWriteFail((MutateWork.NoResult.Quiet) storeProvider -> {
+      storeProvider.getUnsafeTaskStore().deleteAllTasks();
+      throw new CustomException();
+    });
+    expectTasks();
+
+    expectWriteFail((MutateWork.NoResult.Quiet) storeProvider -> {
+      storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("a")));
+      throw new CustomException();
+    });
+    expectTasks("a");
+    storage.read((Work.Quiet<Void>) storeProvider -> {
+      assertEquals(
+          makeTask("a"),
+          Iterables.getOnlyElement(storeProvider.getTaskStore().fetchTasks(
+              Query.taskScoped("a"))));
+      return null;
+    });
+
+    // Nested transaction where inner transaction fails.
+    expectWriteFail((MutateWork.NoResult.Quiet) storeProvider -> {
+      storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("c")));
+      storage.write((MutateWork.NoResult.Quiet) storeProvider1 -> {
+        storeProvider1.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("d")));
+        throw new CustomException();
+      });
+    });
+    expectTasks("a", "c", "d");
+
+    // Nested transaction where outer transaction fails.
+    expectWriteFail((MutateWork.NoResult.Quiet) storeProvider -> {
+      storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("c")));
+      storage.write((MutateWork.NoResult.Quiet) storeProvider12 -> {
+        storeProvider12.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("d")));
+      });
+      throw new CustomException();
+    });
+    expectTasks("a", "c", "d");
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
new file mode 100644
index 0000000..9e75c98
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.mem;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.AbstractModule;
+import com.google.inject.Module;
+import com.google.inject.util.Modules;
+
+import org.apache.aurora.common.stats.SlidingStats;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.common.util.Clock;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.storage.AbstractTaskStoreTest;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.db.InstrumentingInterceptor;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.createMock;
+import static org.junit.Assert.assertEquals;
+
+public class MemTaskStoreTest extends AbstractTaskStoreTest {
+
+  private FakeStatsProvider statsProvider;
+
+  @Override
+  protected Module getStorageModule() {
+    statsProvider = new FakeStatsProvider();
+    return Modules.combine(
+        new MemStorageModule(),
+        new AbstractModule() {
+          @Override
+          protected void configure() {
+            bind(StatsProvider.class).toInstance(statsProvider);
+
+            // bindings for mybatis interceptor
+            SlidingStats slidingStats = createMock(SlidingStats.class);
+            bind(InstrumentingInterceptor.class).toInstance(new InstrumentingInterceptor(
+                Clock.SYSTEM_CLOCK, s -> slidingStats
+            ));
+          }
+        });
+  }
+
+  @Test
+  public void testSecondaryIndexConsistency() {
+    storage.write((NoResult.Quiet) storeProvider -> {
+    // Test for regression of AURORA-1305.
+      TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
+      taskStore.saveTasks(ImmutableSet.of(TASK_A));
+      taskStore.deleteTasks(Tasks.ids(TASK_A));
+      assertEquals(0L, statsProvider.getLongValue(MemTaskStore.getIndexSizeStatName("job")));
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/storage/mem/StorageTransactionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/mem/StorageTransactionTest.java
b/src/test/java/org/apache/aurora/scheduler/storage/mem/StorageTransactionTest.java
index 25f34e2..b7df14a 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/mem/StorageTransactionTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/mem/StorageTransactionTest.java
@@ -21,7 +21,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -34,7 +33,6 @@ import org.apache.aurora.scheduler.resources.ResourceTestUtil;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
-import org.apache.aurora.scheduler.storage.db.DbUtil;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.junit.Before;
 import org.junit.Test;
@@ -56,7 +54,7 @@ public class StorageTransactionTest extends TearDownTestCase {
     executor = Executors.newCachedThreadPool(
         new ThreadFactoryBuilder().setNameFormat("SlowRead-%d").setDaemon(true).build());
     addTearDown(() -> MoreExecutors.shutdownAndAwaitTermination(executor, 1, TimeUnit.SECONDS));
-    storage = DbUtil.createStorage();
+    storage = MemStorageModule.newEmptyStorage();
   }
 
   @Test
@@ -124,9 +122,9 @@ public class StorageTransactionTest extends TearDownTestCase {
     }
 
     storage.read(storeProvider -> {
-      // If the previous write was under a transaction then there would be no quota records.
-      assertEquals(ImmutableMap.of(),
-          storeProvider.getQuotaStore().fetchQuotas());
+      // Since the previous write was not transactional, the quota record remains.
+      assertEquals(ImmutableSet.of("a"),
+          storeProvider.getQuotaStore().fetchQuotas().keySet());
       return null;
     });
   }
@@ -137,7 +135,9 @@ public class StorageTransactionTest extends TearDownTestCase {
       storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("a"), makeTask("b")));
       throw new CustomException();
     });
-    expectTasks();
+    // The in-memory storage is not transactional, so the writes are retained despite the
write
+    // operation failing.
+    expectTasks("a", "b");
 
     storage.write((NoResult.Quiet) storeProvider ->
         storeProvider.getUnsafeTaskStore().saveTasks(
@@ -148,7 +148,7 @@ public class StorageTransactionTest extends TearDownTestCase {
       storeProvider.getUnsafeTaskStore().deleteAllTasks();
       throw new CustomException();
     });
-    expectTasks("a", "b");
+    expectTasks();
 
     storage.write(
         (NoResult.Quiet) storeProvider -> storeProvider.getUnsafeTaskStore().deleteAllTasks());
@@ -157,7 +157,7 @@ public class StorageTransactionTest extends TearDownTestCase {
       storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("a")));
       throw new CustomException();
     });
-    expectTasks();
+    expectTasks("a");
 
     storage.write((NoResult.Quiet) storeProvider ->
         storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("a"))));
@@ -170,7 +170,7 @@ public class StorageTransactionTest extends TearDownTestCase {
         throw new CustomException();
       });
     });
-    expectTasks("a");
+    expectTasks("a", "c", "d");
 
     // Nested transaction where outer transaction fails.
     expectWriteFail(storeProvider -> {
@@ -179,6 +179,6 @@ public class StorageTransactionTest extends TearDownTestCase {
           storeProvider1.getUnsafeTaskStore().saveTasks(ImmutableSet.of(makeTask("d"))));
       throw new CustomException();
     });
-    expectTasks("a");
+    expectTasks("a", "c", "d");
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
index 1f30bfa..b2c371c 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
@@ -66,9 +66,9 @@ import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
 import org.apache.aurora.scheduler.storage.backup.Recovery;
 import org.apache.aurora.scheduler.storage.backup.StorageBackup;
-import org.apache.aurora.scheduler.storage.db.DbModule;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 import org.apache.aurora.scheduler.storage.entities.IServerInfo;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
 import org.apache.aurora.scheduler.thrift.aop.AnnotatedAuroraAdmin;
 import org.apache.aurora.scheduler.thrift.aop.AopModule;
 import org.apache.mesos.v1.Protos.FrameworkInfo;
@@ -105,7 +105,7 @@ public class ThriftIT extends EasyMockTest {
             bind(CliOptions.class).toInstance(options);
             install(new LifecycleModule());
             install(new StatsModule(options.stats));
-            install(DbModule.testModule());
+            install(new MemStorageModule());
             install(new QuotaModule());
             install(new CronModule(options.cron));
             install(new TierModule(TaskTestUtil.TIER_CONFIG));

http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
index 0a76f53..4d62bba 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
@@ -82,7 +82,6 @@ import org.apache.aurora.scheduler.state.UUIDGenerator;
 import org.apache.aurora.scheduler.state.UUIDGenerator.UUIDGeneratorImpl;
 import org.apache.aurora.scheduler.storage.JobUpdateStore;
 import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.db.DbModule;
 import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig;
 import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
@@ -94,6 +93,7 @@ import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
 import org.apache.aurora.scheduler.storage.entities.ILock;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
 import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.apache.aurora.scheduler.updater.JobUpdateController.AuditData;
@@ -183,7 +183,7 @@ public class JobUpdaterIT extends EasyMockTest {
 
     Injector injector = Guice.createInjector(
         new UpdaterModule(executor, options),
-        DbModule.testModuleWithWorkQueue(),
+        new MemStorageModule(),
         new AbstractModule() {
           @Override
           protected void configure() {


Mime
View raw message