aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject git commit: AURORA-126: Fix regression causing secondary index to store duplicate tasks.
Date Tue, 28 Jan 2014 00:07:12 GMT
Updated Branches:
  refs/heads/master 3e59dfb19 -> c33d8378b


AURORA-126: Fix regression causing secondary index to store duplicate tasks.

Bugs closed: AURORA-126

Reviewed at https://reviews.apache.org/r/17432/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/c33d8378
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/c33d8378
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/c33d8378

Branch: refs/heads/master
Commit: c33d8378b6d4d8403cbaaf7ad8678fc3689e46e5
Parents: 3e59dfb
Author: Bill Farner <wfarner@apache.org>
Authored: Mon Jan 27 16:05:47 2014 -0800
Committer: Bill Farner <wfarner@apache.org>
Committed: Mon Jan 27 16:05:47 2014 -0800

----------------------------------------------------------------------
 .../scheduler/storage/mem/MemTaskStore.java     | 53 ++++++++++----------
 .../scheduler/storage/mem/MemTaskStoreTest.java | 33 +++++++++++-
 2 files changed, 59 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c33d8378/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
index eaf18dc..796fd75 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
@@ -89,18 +89,19 @@ class MemTaskStore implements TaskStore.Mutable {
   // thread-safe but not necessarily strongly-consistent unless the externally-controlled
storage
   // lock is secured.  To adhere to that, these data structures are individually thread-safe,
but
   // we don't lock across them because of the relaxed consistency guarantees.
-  // For this reason, the secondary indices store references to Task objects (as opposed
to storing
-  // secondary to primary key mappings).  This ensures that in the face of weak consistency,
query
-  // results are sane.  Otherwise, you could query for seconary key = v1 and get a result
with
-  // secondary key value = v2.
+  // Note that this behavior makes it possible to receive query results that are not sane,
+  // specifically when a secondary key value is changed.  In other words, we currently don't
always
+  // support the invariant that a query by slave host yields a result with all tasks matching
that
+  // slave host.  This is deemed acceptable due to the fact that secondary key values are
rarely
+  // mutated in practice, and mutated in ways that are not impacted by this behavior.
   private final Map<String, Task> tasks = Maps.newConcurrentMap();
   private final List<SecondaryIndex<?>> secondaryIndices = ImmutableList.of(
       new SecondaryIndex<>(
-          Functions.compose(Tasks.SCHEDULED_TO_JOB_KEY, TO_SCHEDULED),
+          Tasks.SCHEDULED_TO_JOB_KEY,
           QUERY_TO_JOB_KEY,
           Stats.exportLong("task_queries_by_job")),
       new SecondaryIndex<>(
-          Functions.compose(Tasks.SCHEDULED_TO_SLAVE_HOST, TO_SCHEDULED),
+          Tasks.SCHEDULED_TO_SLAVE_HOST,
           QUERY_TO_SLAVE_HOST,
           Stats.exportLong("task_queries_by_host")));
 
@@ -146,7 +147,7 @@ class MemTaskStore implements TaskStore.Mutable {
     Iterable<Task> canonicalized = Iterables.transform(newTasks, toTask);
     tasks.putAll(Maps.uniqueIndex(canonicalized, TO_ID));
     for (SecondaryIndex<?> index : secondaryIndices) {
-      index.insert(canonicalized);
+      index.insert(Iterables.transform(canonicalized, TO_SCHEDULED));
     }
   }
 
@@ -169,7 +170,7 @@ class MemTaskStore implements TaskStore.Mutable {
       Task removed = tasks.remove(id);
       if (removed != null) {
         for (SecondaryIndex<?> index : secondaryIndices) {
-          index.remove(removed);
+          index.remove(removed.task);
         }
         configInterner.removeAssociation(removed.task.getAssignedTask().getTask().newBuilder(),
id);
       }
@@ -192,10 +193,9 @@ class MemTaskStore implements TaskStore.Mutable {
         Preconditions.checkState(
             Tasks.id(original.task).equals(Tasks.id(maybeMutated)),
             "A task's ID may not be mutated.");
-        Task newCanonicalTask = toTask.apply(maybeMutated);
-        tasks.put(Tasks.id(maybeMutated), newCanonicalTask);
+        tasks.put(Tasks.id(maybeMutated), toTask.apply(maybeMutated));
         for (SecondaryIndex<?> index : secondaryIndices) {
-          index.replace(original, newCanonicalTask);
+          index.replace(original.task, maybeMutated);
         }
 
         mutated.add(maybeMutated);
@@ -292,11 +292,12 @@ class MemTaskStore implements TaskStore.Mutable {
       from = Optional.of(fromIdIndex(query.get().getTaskIds()));
     } else {
       for (SecondaryIndex<?> index : secondaryIndices) {
-        from = index.getMatches(query);
-        if (from.isPresent()) {
+        Optional<Iterable<String>> indexMatch = index.getMatches(query);
+        if (indexMatch.isPresent()) {
           // Note: we could leverage multiple indexes here if the query applies to them,
by
           // choosing to intersect the results.  Given current indexes and query profile,
this is
           // unlikely to offer much improvement, though.
+          from = Optional.of(fromIdIndex(indexMatch.get()));
           break;
         }
       }
@@ -356,9 +357,9 @@ class MemTaskStore implements TaskStore.Mutable {
    * @param <K> Key type.
    */
   private static class SecondaryIndex<K> {
-    private final Multimap<K, Task> index =
-        Multimaps.synchronizedSetMultimap(HashMultimap.<K, Task>create());
-    private final Function<Task, K> indexer;
+    private final Multimap<K, String> index =
+        Multimaps.synchronizedSetMultimap(HashMultimap.<K, String>create());
+    private final Function<IScheduledTask, K> indexer;
     private final Function<Query.Builder, Optional<K>> queryExtractor;
     private final AtomicLong hitCount;
 
@@ -370,7 +371,7 @@ class MemTaskStore implements TaskStore.Mutable {
      * @param hitCount Counter for number of times the seconary index applies to a query.
      */
     SecondaryIndex(
-        Function<Task, K> indexer,
+        Function<IScheduledTask, K> indexer,
         Function<Query.Builder, Optional<K>> queryExtractor,
         AtomicLong hitCount) {
 
@@ -379,16 +380,16 @@ class MemTaskStore implements TaskStore.Mutable {
       this.hitCount = hitCount;
     }
 
-    void insert(Iterable<Task> tasks) {
-      for (Task task : tasks) {
+    void insert(Iterable<IScheduledTask> tasks) {
+      for (IScheduledTask task : tasks) {
         insert(task);
       }
     }
 
-    void insert(Task task) {
+    void insert(IScheduledTask task) {
       K key = indexer.apply(task);
       if (key != null) {
-        index.put(key, task);
+        index.put(key, Tasks.id(task));
       }
     }
 
@@ -396,28 +397,28 @@ class MemTaskStore implements TaskStore.Mutable {
       index.clear();
     }
 
-    void remove(Task task) {
+    void remove(IScheduledTask task) {
       K key = indexer.apply(task);
       if (key != null) {
         index.remove(key, task);
       }
     }
 
-    void replace(Task old, Task replacement) {
+    void replace(IScheduledTask old, IScheduledTask replacement) {
       synchronized (index) {
         remove(old);
         insert(replacement);
       }
     }
 
-    private final Function<K, Iterable<Task>> lookup = new Function<K, Iterable<Task>>()
{
-      @Override public Iterable<Task> apply(K key) {
+    private final Function<K, Iterable<String>> lookup = new Function<K, Iterable<String>>()
{
+      @Override public Iterable<String> apply(K key) {
         hitCount.incrementAndGet();
         return index.get(key);
       }
     };
 
-    Optional<Iterable<Task>> getMatches(Query.Builder query) {
+    Optional<Iterable<String>> getMatches(Query.Builder query) {
       return queryExtractor.apply(query).transform(lookup);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c33d8378/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
index ff5bf89..0955338 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
@@ -250,6 +250,12 @@ public class MemTaskStoreTest {
     return IScheduledTask.build(builder);
   }
 
+  private static IScheduledTask setConfigData(IScheduledTask task, String configData) {
+    ScheduledTask builder = task.newBuilder();
+    builder.getAssignedTask().getTask().getExecutorConfig().setData(configData);
+    return IScheduledTask.build(builder);
+  }
+
   @Test
   public void testAddSlaveHost() {
     final IScheduledTask a = makeTask("a", "role", "env", "job");
@@ -307,6 +313,30 @@ public class MemTaskStoreTest {
     assertQueryResults(Query.taskScoped(Tasks.id(b)), b);
   }
 
+  @Test
+  public void testTasksOnSameHost() {
+    String host = "slaveA";
+    final IScheduledTask a = setHost(makeTask("a", "role", "env", "job"), Optional.of(host));
+    final IScheduledTask b = setHost(makeTask("b", "role", "env", "job"), Optional.of(host));
+    store.saveTasks(ImmutableSet.of(a, b));
+    assertQueryResults(Query.slaveScoped(host), a, b);
+  }
+
+  @Test
+  public void testSaveOverwrites() {
+    // Ensures that saving a task with an existing task ID is effectively the same as a mutate,
+    // and does not result in a duplicate object in the primary or secondary index.
+
+    String host = "slaveA";
+    final IScheduledTask a = setHost(makeTask("a", "role", "env", "job"), Optional.of(host));
+    store.saveTasks(ImmutableSet.of(a));
+
+    final IScheduledTask updated = setConfigData(a, "new config data");
+    store.saveTasks(ImmutableSet.of(updated));
+    assertQueryResults(Query.taskScoped(Tasks.id(a)), updated);
+    assertQueryResults(Query.slaveScoped(host), updated);
+  }
+
   private void assertStoreContents(IScheduledTask... tasks) {
     assertQueryResults(Query.unscoped(), tasks);
   }
@@ -330,7 +360,8 @@ public class MemTaskStoreTest {
             .setTask(new TaskConfig()
                 .setJobName(jobName)
                 .setEnvironment(env)
-                .setOwner(new Identity(role, role)))));
+                .setOwner(new Identity(role, role))
+                .setExecutorConfig(new ExecutorConfig().setData("executor config")))));
   }
 
   private static IScheduledTask makeTask(String id) {


Mime
View raw message