Return-Path: X-Original-To: apmail-aurora-commits-archive@minotaur.apache.org Delivered-To: apmail-aurora-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F240418021 for ; Thu, 9 Jul 2015 21:57:01 +0000 (UTC) Received: (qmail 8007 invoked by uid 500); 9 Jul 2015 21:56:56 -0000 Delivered-To: apmail-aurora-commits-archive@aurora.apache.org Received: (qmail 7972 invoked by uid 500); 9 Jul 2015 21:56:56 -0000 Mailing-List: contact commits-help@aurora.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aurora.apache.org Delivered-To: mailing list commits@aurora.apache.org Received: (qmail 7963 invoked by uid 99); 9 Jul 2015 21:56:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Jul 2015 21:56:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B7C3CE6838; Thu, 9 Jul 2015 21:56:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wfarner@apache.org To: commits@aurora.apache.org Message-Id: <1f946c3bf6104e84a1c284da825a20a5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: aurora git commit: TaskConfigManager: de-dupe task configs when inserting. Date: Thu, 9 Jul 2015 21:56:56 +0000 (UTC) Repository: aurora Updated Branches: refs/heads/master 3e85d5c50 -> 4a4b11c43 TaskConfigManager: de-dupe task configs when inserting. Bugs closed: AURORA-1392 Reviewed at https://reviews.apache.org/r/36372/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/4a4b11c4 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/4a4b11c4 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/4a4b11c4 Branch: refs/heads/master Commit: 4a4b11c436d0022ccfa4fe18e0726918682bdf1f Parents: 3e85d5c Author: Bill Farner Authored: Thu Jul 9 14:56:42 2015 -0700 Committer: Bill Farner Committed: Thu Jul 9 14:56:42 2015 -0700 ---------------------------------------------------------------------- .../aurora/scheduler/base/TaskTestUtil.java | 6 ++- .../scheduler/storage/db/DbCronJobStore.java | 2 +- .../scheduler/storage/db/DbTaskStore.java | 43 +------------------- .../scheduler/storage/db/TaskConfigManager.java | 23 ++++++++++- .../storage/AbstractCronJobStoreTest.java | 32 +++++++++++++++ 5 files changed, 61 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/4a4b11c4/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java index 2f42851..c16aa4c 100644 --- a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java +++ b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java @@ -79,6 +79,10 @@ public final class TaskTestUtil { } public static IScheduledTask makeTask(String id, IJobKey job) { + return makeTask(id, makeConfig(job)); + } + + public static IScheduledTask makeTask(String id, ITaskConfig config) { return IScheduledTask.build(new ScheduledTask() .setStatus(ScheduleStatus.PENDING) .setTaskEvents(ImmutableList.of( @@ -94,7 +98,7 @@ public final class TaskTestUtil { .setInstanceId(2) .setTaskId(id) .setAssignedPorts(ImmutableMap.of("http", 1000)) - .setTask(makeConfig(job).newBuilder()))); + .setTask(config.newBuilder()))); } public static IScheduledTask addStateTransition( http://git-wip-us.apache.org/repos/asf/aurora/blob/4a4b11c4/src/main/java/org/apache/aurora/scheduler/storage/db/DbCronJobStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbCronJobStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbCronJobStore.java index c1237b7..664255f 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbCronJobStore.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbCronJobStore.java @@ -70,7 +70,7 @@ class DbCronJobStore implements CronJobStore.Mutable { @Override public JobConfiguration apply(CronJobWrapper row) { JobConfiguration job = row.getJob(); - job.setTaskConfig(taskConfigManager.getConfigSaturator().apply( + job.setTaskConfig(taskConfigManager.getConfigHydrator().apply( new TaskConfigRow(row.getTaskConfigRowId(), job.getTaskConfig()))); return job; } http://git-wip-us.apache.org/repos/asf/aurora/blob/4a4b11c4/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java index f3f241d..0f56411 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java @@ -20,7 +20,6 @@ import java.util.logging.Level; import java.util.logging.Logger; import com.google.common.base.Function; -import com.google.common.base.Functions; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; @@ -33,7 +32,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; import com.google.inject.Inject; import com.twitter.common.inject.TimedInterceptor.Timed; import com.twitter.common.quantity.Amount; @@ -112,39 +110,6 @@ class DbTaskStore implements TaskStore.Mutable { return IJobKey.setFromBuilders(taskMapper.selectJobKeys()); } - private static final Function CONFIG_ID = - new Function() { - @Override - public Long apply(TaskConfigRow row) { - return row.getId(); - } - }; - - /** - * Computes an association between config table row ID and {@link ITaskConfig} object for all - * configs in the provided jobs. - * - * @param jobs Jobs to fetch task configs for. - * @return A mutable bi-map between row ID and task config. - */ - private Map getTaskConfigRows(Set jobs) { - Function> getRows = - new Function>() { - @Override - public Iterable apply(IJobKey job) { - return configManager.getConfigs(job); - } - }; - - Map rowsToIds = - FluentIterable.from(jobs) - .transformAndConcat(getRows) - .uniqueIndex( - Functions.compose(ITaskConfig.FROM_BUILDER, configManager.getConfigSaturator())); - - return Maps.transformValues(rowsToIds, CONFIG_ID); - } - @Timed("db_storage_save_tasks") @Override public void saveTasks(Set tasks) { @@ -166,12 +131,6 @@ class DbTaskStore implements TaskStore.Mutable { } }); - // Seed the cache with known configs in the jobs being updated. - configCache.putAll(getTaskConfigRows( - FluentIterable.from(tasks) - .transform(Tasks.SCHEDULED_TO_JOB_KEY) - .toSet())); - for (IScheduledTask task : tasks) { long configId = configCache.getUnchecked(task.getAssignedTask().getTask()); @@ -272,7 +231,7 @@ class DbTaskStore implements TaskStore.Mutable { filter = Predicates.alwaysTrue(); } - final Function configSaturator = configManager.getConfigSaturator(); + final Function configSaturator = configManager.getConfigHydrator(); return FluentIterable.from(results) .transform(populateAssignedPorts) .transform(new Function() { http://git-wip-us.apache.org/repos/asf/aurora/blob/4a4b11c4/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigManager.java b/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigManager.java index 39177fd..2ec6b2d 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigManager.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigManager.java @@ -15,6 +15,7 @@ package org.apache.aurora.scheduler.storage.db; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import javax.inject.Inject; @@ -87,7 +88,7 @@ class TaskConfigManager { * * @return A function to populate relations in task configs. */ - Function getConfigSaturator() { + Function getConfigHydrator() { // It appears that there is no way in mybatis to populate a field of type Map. To work around // this, we need to manually perform the query and associate the elements. final LoadingCache> taskLinkCache = CacheBuilder.newBuilder() @@ -107,8 +108,28 @@ class TaskConfigManager { return Functions.compose(REPLACE_UNION_TYPES, linkPopulator); } + private Optional getConfigRow(ITaskConfig config) { + // We could optimize this slightly by first comparing the un-hydrated row and breaking early. + + Iterable configsInJob = getConfigs(config.getJob()); + + Map rowsToIds = + FluentIterable.from(configsInJob) + .uniqueIndex( + Functions.compose(ITaskConfig.FROM_BUILDER, getConfigHydrator())); + + return Optional.ofNullable(rowsToIds.get(config)).map(TaskConfigRow::getId); + } + long insert(ITaskConfig config) { InsertResult configInsert = new InsertResult(); + + // Determine whether this config is already stored. + Optional existingRow = getConfigRow(config); + if (existingRow.isPresent()) { + return existingRow.get(); + } + jobKeyMapper.merge(config.getJob()); configMapper.insert(config, configInsert); for (IConstraint constraint : config.getConstraints()) { http://git-wip-us.apache.org/repos/asf/aurora/blob/4a4b11c4/src/test/java/org/apache/aurora/scheduler/storage/AbstractCronJobStoreTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/AbstractCronJobStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/AbstractCronJobStoreTest.java index 8d9792f..c7329c9 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/AbstractCronJobStoreTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/AbstractCronJobStoreTest.java @@ -24,14 +24,18 @@ import com.google.inject.Module; import org.apache.aurora.gen.CronCollisionPolicy; import org.apache.aurora.gen.Identity; import org.apache.aurora.gen.JobConfiguration; +import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.scheduler.base.JobKeys; +import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.TaskTestUtil; +import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import org.apache.aurora.scheduler.storage.Storage.MutateWork; import org.apache.aurora.scheduler.storage.Storage.StoreProvider; import org.apache.aurora.scheduler.storage.Storage.Work; import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; import org.apache.aurora.scheduler.storage.entities.IJobKey; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.apache.aurora.scheduler.storage.testing.StorageEntityUtil; import org.junit.Before; @@ -103,6 +107,34 @@ public abstract class AbstractCronJobStoreTest { } @Test + public void testTaskConfigDedupe() { + // Test for regression of AURORA-1392. + + final IScheduledTask instance = TaskTestUtil.makeTask("a", JOB_A.getTaskConfig()); + storage.write(new MutateWork.NoResult.Quiet() { + @Override + protected void execute(MutableStoreProvider storeProvider) { + storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(instance)); + } + }); + + saveAcceptedJob(JOB_A); + + storage.write(new MutateWork.NoResult.Quiet() { + @Override + protected void execute(MutableStoreProvider storeProvider) { + storeProvider.getUnsafeTaskStore().mutateTasks(Query.taskScoped(Tasks.id(instance)), + new TaskStore.Mutable.TaskMutation() { + @Override + public IScheduledTask apply(IScheduledTask task) { + return IScheduledTask.build(task.newBuilder().setStatus(ScheduleStatus.RUNNING)); + } + }); + } + }); + } + + @Test public void testUpdate() { // Test for regression of AURORA-1390. Updates are not normal, but are used in cases such as // backfilling fields upon storage recovery.