aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject aurora git commit: TaskConfigManager: de-dupe task configs when inserting.
Date Thu, 09 Jul 2015 21:56:56 GMT
Repository: aurora
Updated Branches:
  refs/heads/master 3e85d5c50 -> 4a4b11c43


TaskConfigManager: de-dupe task configs when inserting.

Bugs closed: AURORA-1392

Reviewed at https://reviews.apache.org/r/36372/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/4a4b11c4
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/4a4b11c4
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/4a4b11c4

Branch: refs/heads/master
Commit: 4a4b11c436d0022ccfa4fe18e0726918682bdf1f
Parents: 3e85d5c
Author: Bill Farner <wfarner@apache.org>
Authored: Thu Jul 9 14:56:42 2015 -0700
Committer: Bill Farner <wfarner@apache.org>
Committed: Thu Jul 9 14:56:42 2015 -0700

----------------------------------------------------------------------
 .../aurora/scheduler/base/TaskTestUtil.java     |  6 ++-
 .../scheduler/storage/db/DbCronJobStore.java    |  2 +-
 .../scheduler/storage/db/DbTaskStore.java       | 43 +-------------------
 .../scheduler/storage/db/TaskConfigManager.java | 23 ++++++++++-
 .../storage/AbstractCronJobStoreTest.java       | 32 +++++++++++++++
 5 files changed, 61 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/4a4b11c4/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
index 2f42851..c16aa4c 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
@@ -79,6 +79,10 @@ public final class TaskTestUtil {
   }
 
   public static IScheduledTask makeTask(String id, IJobKey job) {
+    return makeTask(id, makeConfig(job));
+  }
+
+  public static IScheduledTask makeTask(String id, ITaskConfig config) {
     return IScheduledTask.build(new ScheduledTask()
         .setStatus(ScheduleStatus.PENDING)
         .setTaskEvents(ImmutableList.of(
@@ -94,7 +98,7 @@ public final class TaskTestUtil {
             .setInstanceId(2)
             .setTaskId(id)
             .setAssignedPorts(ImmutableMap.of("http", 1000))
-            .setTask(makeConfig(job).newBuilder())));
+            .setTask(config.newBuilder())));
   }
 
   public static IScheduledTask addStateTransition(

http://git-wip-us.apache.org/repos/asf/aurora/blob/4a4b11c4/src/main/java/org/apache/aurora/scheduler/storage/db/DbCronJobStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbCronJobStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbCronJobStore.java
index c1237b7..664255f 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbCronJobStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbCronJobStore.java
@@ -70,7 +70,7 @@ class DbCronJobStore implements CronJobStore.Mutable {
         @Override
         public JobConfiguration apply(CronJobWrapper row) {
           JobConfiguration job = row.getJob();
-          job.setTaskConfig(taskConfigManager.getConfigSaturator().apply(
+          job.setTaskConfig(taskConfigManager.getConfigHydrator().apply(
               new TaskConfigRow(row.getTaskConfigRowId(), job.getTaskConfig())));
           return job;
         }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4a4b11c4/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java
index f3f241d..0f56411 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java
@@ -20,7 +20,6 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import com.google.common.base.Function;
-import com.google.common.base.Functions;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
@@ -33,7 +32,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
 import com.google.inject.Inject;
 import com.twitter.common.inject.TimedInterceptor.Timed;
 import com.twitter.common.quantity.Amount;
@@ -112,39 +110,6 @@ class DbTaskStore implements TaskStore.Mutable {
     return IJobKey.setFromBuilders(taskMapper.selectJobKeys());
   }
 
-  private static final Function<TaskConfigRow, Long> CONFIG_ID =
-      new Function<TaskConfigRow, Long>() {
-        @Override
-        public Long apply(TaskConfigRow row) {
-          return row.getId();
-        }
-      };
-
-  /**
-   * Computes an association between config table row ID and {@link ITaskConfig} object for
all
-   * configs in the provided jobs.
-   *
-   * @param jobs Jobs to fetch task configs for.
-   * @return A mutable bi-map between row ID and task config.
-   */
-  private Map<ITaskConfig, Long> getTaskConfigRows(Set<IJobKey> jobs) {
-    Function<IJobKey, Iterable<TaskConfigRow>> getRows =
-        new Function<IJobKey, Iterable<TaskConfigRow>>() {
-          @Override
-          public Iterable<TaskConfigRow> apply(IJobKey job) {
-            return configManager.getConfigs(job);
-          }
-        };
-
-    Map<ITaskConfig, TaskConfigRow> rowsToIds =
-        FluentIterable.from(jobs)
-            .transformAndConcat(getRows)
-            .uniqueIndex(
-                Functions.compose(ITaskConfig.FROM_BUILDER, configManager.getConfigSaturator()));
-
-    return Maps.transformValues(rowsToIds, CONFIG_ID);
-  }
-
   @Timed("db_storage_save_tasks")
   @Override
   public void saveTasks(Set<IScheduledTask> tasks) {
@@ -166,12 +131,6 @@ class DbTaskStore implements TaskStore.Mutable {
           }
         });
 
-    // Seed the cache with known configs in the jobs being updated.
-    configCache.putAll(getTaskConfigRows(
-        FluentIterable.from(tasks)
-            .transform(Tasks.SCHEDULED_TO_JOB_KEY)
-            .toSet()));
-
     for (IScheduledTask task : tasks) {
       long configId = configCache.getUnchecked(task.getAssignedTask().getTask());
 
@@ -272,7 +231,7 @@ class DbTaskStore implements TaskStore.Mutable {
       filter = Predicates.alwaysTrue();
     }
 
-    final Function<TaskConfigRow, TaskConfig> configSaturator = configManager.getConfigSaturator();
+    final Function<TaskConfigRow, TaskConfig> configSaturator = configManager.getConfigHydrator();
     return FluentIterable.from(results)
         .transform(populateAssignedPorts)
         .transform(new Function<ScheduledTaskWrapper, ScheduledTaskWrapper>() {

http://git-wip-us.apache.org/repos/asf/aurora/blob/4a4b11c4/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigManager.java b/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigManager.java
index 39177fd..2ec6b2d 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigManager.java
@@ -15,6 +15,7 @@ package org.apache.aurora.scheduler.storage.db;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 import javax.inject.Inject;
@@ -87,7 +88,7 @@ class TaskConfigManager {
    *
    * @return A function to populate relations in task configs.
    */
-  Function<TaskConfigRow, TaskConfig> getConfigSaturator() {
+  Function<TaskConfigRow, TaskConfig> getConfigHydrator() {
     // It appears that there is no way in mybatis to populate a field of type Map.  To work
around
     // this, we need to manually perform the query and associate the elements.
     final LoadingCache<Long, Map<String, String>> taskLinkCache = CacheBuilder.newBuilder()
@@ -107,8 +108,28 @@ class TaskConfigManager {
     return Functions.compose(REPLACE_UNION_TYPES, linkPopulator);
   }
 
+  private Optional<Long> getConfigRow(ITaskConfig config) {
+    // We could optimize this slightly by first comparing the un-hydrated row and breaking
early.
+
+    Iterable<TaskConfigRow> configsInJob = getConfigs(config.getJob());
+
+    Map<ITaskConfig, TaskConfigRow> rowsToIds =
+        FluentIterable.from(configsInJob)
+            .uniqueIndex(
+                Functions.compose(ITaskConfig.FROM_BUILDER, getConfigHydrator()));
+
+    return Optional.ofNullable(rowsToIds.get(config)).map(TaskConfigRow::getId);
+  }
+
   long insert(ITaskConfig config) {
     InsertResult configInsert = new InsertResult();
+
+    // Determine whether this config is already stored.
+    Optional<Long> existingRow = getConfigRow(config);
+    if (existingRow.isPresent()) {
+      return existingRow.get();
+    }
+
     jobKeyMapper.merge(config.getJob());
     configMapper.insert(config, configInsert);
     for (IConstraint constraint : config.getConstraints()) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/4a4b11c4/src/test/java/org/apache/aurora/scheduler/storage/AbstractCronJobStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/AbstractCronJobStoreTest.java
b/src/test/java/org/apache/aurora/scheduler/storage/AbstractCronJobStoreTest.java
index 8d9792f..c7329c9 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/AbstractCronJobStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/AbstractCronJobStoreTest.java
@@ -24,14 +24,18 @@ import com.google.inject.Module;
 import org.apache.aurora.gen.CronCollisionPolicy;
 import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.JobConfiguration;
+import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.Work;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.storage.testing.StorageEntityUtil;
 import org.junit.Before;
@@ -103,6 +107,34 @@ public abstract class AbstractCronJobStoreTest {
   }
 
   @Test
+  public void testTaskConfigDedupe() {
+    // Test for regression of AURORA-1392.
+
+    final IScheduledTask instance = TaskTestUtil.makeTask("a", JOB_A.getTaskConfig());
+    storage.write(new MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(instance));
+      }
+    });
+
+    saveAcceptedJob(JOB_A);
+
+    storage.write(new MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().mutateTasks(Query.taskScoped(Tasks.id(instance)),
+            new TaskStore.Mutable.TaskMutation() {
+              @Override
+              public IScheduledTask apply(IScheduledTask task) {
+                return IScheduledTask.build(task.newBuilder().setStatus(ScheduleStatus.RUNNING));
+              }
+            });
+      }
+    });
+  }
+
+  @Test
   public void testUpdate() {
     // Test for regression of AURORA-1390.  Updates are not normal, but are used in cases
such as
     // backfilling fields upon storage recovery.


Mime
View raw message