Return-Path: X-Original-To: apmail-aurora-commits-archive@minotaur.apache.org Delivered-To: apmail-aurora-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 481CE10F19 for ; Wed, 18 Feb 2015 01:35:45 +0000 (UTC) Received: (qmail 10116 invoked by uid 500); 18 Feb 2015 01:35:45 -0000 Delivered-To: apmail-aurora-commits-archive@aurora.apache.org Received: (qmail 10078 invoked by uid 500); 18 Feb 2015 01:35:45 -0000 Mailing-List: contact commits-help@aurora.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aurora.incubator.apache.org Delivered-To: mailing list commits@aurora.incubator.apache.org Received: (qmail 10069 invoked by uid 99); 18 Feb 2015 01:35:45 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Feb 2015 01:35:45 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 18 Feb 2015 01:35:40 +0000 Received: (qmail 10001 invoked by uid 99); 18 Feb 2015 01:35:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Feb 2015 01:35:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3898AE03F7; Wed, 18 Feb 2015 01:35:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wfarner@apache.org To: commits@aurora.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: incubator-aurora git commit: Uniquely identify job updates in the database by JobUpdateKey. Date: Wed, 18 Feb 2015 01:35:20 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-aurora Updated Branches: refs/heads/master 4b43305b3 -> ec66a5ec6 Uniquely identify job updates in the database by JobUpdateKey. Bugs closed: AURORA-1093 Reviewed at https://reviews.apache.org/r/31136/ Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/ec66a5ec Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/ec66a5ec Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/ec66a5ec Branch: refs/heads/master Commit: ec66a5ec68a825235e672843c3c6aaeeba2e1cdd Parents: 4b43305 Author: Bill Farner Authored: Tue Feb 17 17:33:10 2015 -0800 Committer: Bill Farner Committed: Tue Feb 17 17:33:10 2015 -0800 ---------------------------------------------------------------------- .../thrift/org/apache/aurora/gen/api.thrift | 4 + .../scheduler/storage/db/DBJobUpdateStore.java | 141 +++++++++++----- .../db/JobInstanceUpdateEventMapper.java | 5 +- .../storage/db/JobUpdateDetailsMapper.java | 74 +++++---- .../storage/db/JobUpdateEventMapper.java | 5 +- .../scheduler/storage/db/PruneVictim.java | 40 +++++ .../storage/db/JobInstanceUpdateEventMapper.xml | 8 +- .../storage/db/JobUpdateDetailsMapper.xml | 160 +++++++++++++------ .../storage/db/JobUpdateEventMapper.xml | 8 +- .../aurora/scheduler/storage/db/schema.sql | 19 +-- .../storage/db/DBJobUpdateStoreTest.java | 27 ++++ 11 files changed, 343 insertions(+), 148 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ec66a5ec/api/src/main/thrift/org/apache/aurora/gen/api.thrift ---------------------------------------------------------------------- diff --git a/api/src/main/thrift/org/apache/aurora/gen/api.thrift b/api/src/main/thrift/org/apache/aurora/gen/api.thrift index 2a77f28..11116f6 100644 --- a/api/src/main/thrift/org/apache/aurora/gen/api.thrift +++ b/api/src/main/thrift/org/apache/aurora/gen/api.thrift @@ -718,6 +718,7 @@ struct JobUpdateState { /** Summary of the job update including job key, user and current state. */ struct JobUpdateSummary { + // TODO(wfarner): As part of AURORA-1093, add a JobUpdateKey field, deprecate updateId and jobKey. /** Update ID. */ 1: string updateId @@ -786,6 +787,9 @@ struct JobUpdateQuery { /** Job role. */ 2: string role + /** Unique identifier for a job update. */ + 8: JobUpdateKey key + /** Job key. */ 3: JobKey jobKey http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ec66a5ec/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java index 39f72cc..34c4ab5 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java @@ -21,19 +21,23 @@ import javax.inject.Inject; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.twitter.common.base.MorePreconditions; import org.apache.aurora.gen.JobUpdate; import org.apache.aurora.gen.JobUpdateInstructions; +import org.apache.aurora.gen.JobUpdateKey; import org.apache.aurora.gen.storage.StoredJobUpdateDetails; import org.apache.aurora.scheduler.storage.JobUpdateStore; +import org.apache.aurora.scheduler.storage.Storage.StorageException; import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig; import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; import org.apache.aurora.scheduler.storage.entities.IJobUpdate; import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails; import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent; import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery; import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary; import org.apache.aurora.scheduler.storage.entities.IRange; @@ -75,12 +79,14 @@ public class DBJobUpdateStore implements JobUpdateStore.Mutable { "Missing both initial and desired states. At least one is required."); } - jobKeyMapper.merge(update.getSummary().getJobKey().newBuilder()); + IJobUpdateSummary summary = update.getSummary(); + jobKeyMapper.merge(summary.getJobKey().newBuilder()); detailsMapper.insert(update.newBuilder()); - String updateId = update.getSummary().getUpdateId(); + IJobUpdateKey key = IJobUpdateKey.build( + new JobUpdateKey(summary.getJobKey().newBuilder(), summary.getUpdateId())); if (lockToken.isPresent()) { - detailsMapper.insertLockToken(updateId, lockToken.get()); + detailsMapper.insertLockToken(key, lockToken.get()); } // Insert optional instance update overrides. @@ -88,20 +94,20 @@ public class DBJobUpdateStore implements JobUpdateStore.Mutable { update.getInstructions().getSettings().getUpdateOnlyTheseInstances(); if (!instanceOverrides.isEmpty()) { - detailsMapper.insertInstanceOverrides(updateId, IRange.toBuildersSet(instanceOverrides)); + detailsMapper.insertInstanceOverrides(key, IRange.toBuildersSet(instanceOverrides)); } // Insert desired state task config and instance mappings. if (update.getInstructions().isSetDesiredState()) { IInstanceTaskConfig desired = update.getInstructions().getDesiredState(); detailsMapper.insertTaskConfig( - updateId, + key, desired.getTask().newBuilder(), true, new InsertResult()); detailsMapper.insertDesiredInstances( - updateId, + key, IRange.toBuildersSet(MorePreconditions.checkNotBlank(desired.getInstances()))); } @@ -109,7 +115,7 @@ public class DBJobUpdateStore implements JobUpdateStore.Mutable { if (!update.getInstructions().getInitialState().isEmpty()) { for (IInstanceTaskConfig config : update.getInstructions().getInitialState()) { InsertResult result = new InsertResult(); - detailsMapper.insertTaskConfig(updateId, config.getTask().newBuilder(), false, result); + detailsMapper.insertTaskConfig(key, config.getTask().newBuilder(), false, result); detailsMapper.insertTaskConfigInstances( result.getId(), @@ -121,13 +127,23 @@ public class DBJobUpdateStore implements JobUpdateStore.Mutable { @Timed("job_update_store_save_event") @Override public void saveJobUpdateEvent(IJobUpdateEvent event, String updateId) { - jobEventMapper.insert(updateId, event.newBuilder()); + Optional key = fetchUpdateKey(updateId); + if (key.isPresent()) { + jobEventMapper.insert(key.get(), event.newBuilder()); + } else { + throw new StorageException("No update to associate with update ID " + updateId); + } } @Timed("job_update_store_save_instance_event") @Override public void saveJobInstanceUpdateEvent(IJobInstanceUpdateEvent event, String updateId) { - instanceEventMapper.insert(event.newBuilder(), updateId); + Optional key = fetchUpdateKey(updateId); + if (key.isPresent()) { + instanceEventMapper.insert(event.newBuilder(), key.get()); + } else { + throw new StorageException("No update to associate with update ID " + updateId); + } } @Timed("job_update_store_delete_all") @@ -136,6 +152,21 @@ public class DBJobUpdateStore implements JobUpdateStore.Mutable { detailsMapper.truncate(); } + private static final Function GET_ROW_ID = new Function() { + @Override + public Long apply(PruneVictim victim) { + return victim.getRowId(); + } + }; + + private static final Function GET_UPDATE_ID = + new Function() { + @Override + public String apply(PruneVictim victim) { + return victim.getUpdate().getId(); + } + }; + @Timed("job_update_store_prune_history") @Override public Set pruneHistory(int perJobRetainCount, long historyPruneThresholdMs) { @@ -146,14 +177,16 @@ public class DBJobUpdateStore implements JobUpdateStore.Mutable { historyPruneThresholdMs); for (Long jobKeyId : jobKeyIdsToPrune) { - Set pruneVictims = detailsMapper.selectPruneVictims( + Set pruneVictims = detailsMapper.selectPruneVictims( jobKeyId, perJobRetainCount, historyPruneThresholdMs); - detailsMapper.deleteCompletedUpdates(pruneVictims); - pruned.addAll(pruneVictims); + detailsMapper.deleteCompletedUpdates( + FluentIterable.from(pruneVictims).transform(GET_ROW_ID).toSet()); + pruned.addAll(FluentIterable.from(pruneVictims).transform(GET_UPDATE_ID)); } + return pruned.build(); } @@ -179,37 +212,52 @@ public class DBJobUpdateStore implements JobUpdateStore.Mutable { @Timed("job_update_store_fetch_details") @Override public Optional fetchJobUpdateDetails(final String updateId) { - return Optional.fromNullable(detailsMapper.selectDetails(updateId)) - .transform(new Function() { - @Override - public IJobUpdateDetails apply(StoredJobUpdateDetails input) { - return IJobUpdateDetails.build(input.getDetails()); - } - }); + Optional key = fetchUpdateKey(updateId); + if (key.isPresent()) { + return Optional.fromNullable(detailsMapper.selectDetails(key.get())) + .transform(new Function() { + @Override + public IJobUpdateDetails apply(StoredJobUpdateDetails input) { + return IJobUpdateDetails.build(input.getDetails()); + } + }); + } else { + return Optional.absent(); + } } @Timed("job_update_store_fetch_update") @Override public Optional fetchJobUpdate(String updateId) { - return Optional.fromNullable(detailsMapper.selectUpdate(updateId)) - .transform(new Function() { - @Override - public IJobUpdate apply(JobUpdate input) { - return IJobUpdate.build(input); - } - }); + Optional key = fetchUpdateKey(updateId); + if (key.isPresent()) { + return Optional.fromNullable(detailsMapper.selectUpdate(key.get())) + .transform(new Function() { + @Override + public IJobUpdate apply(JobUpdate input) { + return IJobUpdate.build(input); + } + }); + } else { + return Optional.absent(); + } } @Timed("job_update_store_fetch_instructions") @Override public Optional fetchJobUpdateInstructions(String updateId) { - return Optional.fromNullable(detailsMapper.selectInstructions(updateId)) - .transform(new Function() { - @Override - public IJobUpdateInstructions apply(JobUpdateInstructions input) { - return IJobUpdateInstructions.build(input); - } - }); + Optional key = fetchUpdateKey(updateId); + if (key.isPresent()) { + return Optional.fromNullable(detailsMapper.selectInstructions(key.get())) + .transform(new Function() { + @Override + public IJobUpdateInstructions apply(JobUpdateInstructions input) { + return IJobUpdateInstructions.build(input); + } + }); + } else { + return Optional.absent(); + } } @Timed("job_update_store_fetch_all_details") @@ -218,19 +266,34 @@ public class DBJobUpdateStore implements JobUpdateStore.Mutable { return ImmutableSet.copyOf(detailsMapper.selectAllDetails()); } + private Optional fetchUpdateKey(String updateId) { + return Optional.fromNullable(detailsMapper.selectUpdateKey(updateId)) + .transform(IJobUpdateKey.FROM_BUILDER); + } + @Timed("job_update_store_get_lock_token") @Override public Optional getLockToken(String updateId) { - // We assume here that cascading deletes will cause a lock-update associative row to disappear - // when the lock is invalidated. This further assumes that a lock row is deleted when a lock - // is no longer valid. - return Optional.fromNullable(detailsMapper.selectLockToken(updateId)); + Optional key = fetchUpdateKey(updateId); + if (key.isPresent()) { + // We assume here that cascading deletes will cause a lock-update associative row to disappear + // when the lock is invalidated. This further assumes that a lock row is deleted when a lock + // is no longer valid. + return Optional.fromNullable(detailsMapper.selectLockToken(key.get())); + } else { + return Optional.absent(); + } } @Timed("job_update_store_fetch_instance_events") @Override public List fetchInstanceEvents(String updateId, int instanceId) { - return IJobInstanceUpdateEvent.listFromBuilders( - detailsMapper.selectInstanceUpdateEvents(updateId, instanceId)); + Optional key = fetchUpdateKey(updateId); + if (key.isPresent()) { + return IJobInstanceUpdateEvent.listFromBuilders( + detailsMapper.selectInstanceUpdateEvents(key.get(), instanceId)); + } else { + return ImmutableList.of(); + } } } http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ec66a5ec/src/main/java/org/apache/aurora/scheduler/storage/db/JobInstanceUpdateEventMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/JobInstanceUpdateEventMapper.java b/src/main/java/org/apache/aurora/scheduler/storage/db/JobInstanceUpdateEventMapper.java index d5dd5a5..4fb33bd 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/JobInstanceUpdateEventMapper.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/JobInstanceUpdateEventMapper.java @@ -14,6 +14,7 @@ package org.apache.aurora.scheduler.storage.db; import org.apache.aurora.gen.JobInstanceUpdateEvent; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; import org.apache.ibatis.annotations.Param; /** @@ -27,7 +28,7 @@ interface JobInstanceUpdateEventMapper { * Inserts a new job instance update event into the database. * * @param event Event to insert. - * @param updateId Update ID of the event. + * @param key Update key of the event. */ - void insert(@Param("event") JobInstanceUpdateEvent event, @Param("updateId") String updateId); + void insert(@Param("event") JobInstanceUpdateEvent event, @Param("key") IJobUpdateKey key); } http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ec66a5ec/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java b/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java index 028eb7c..b1b6f11 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java @@ -21,11 +21,13 @@ import javax.annotation.Nullable; import org.apache.aurora.gen.JobInstanceUpdateEvent; import org.apache.aurora.gen.JobUpdate; import org.apache.aurora.gen.JobUpdateInstructions; +import org.apache.aurora.gen.JobUpdateKey; import org.apache.aurora.gen.JobUpdateQuery; import org.apache.aurora.gen.JobUpdateSummary; import org.apache.aurora.gen.Range; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.gen.storage.StoredJobUpdateDetails; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; import org.apache.ibatis.annotations.Param; /** @@ -45,23 +47,23 @@ interface JobUpdateDetailsMapper { /** * Inserts an association between an update and a lock. * - * @param updateId Unique update identifier. + * @param key Unique update identifier. * @param lockToken Unique lock identifier, resulting from * {@link org.apache.aurora.scheduler.storage.entities.ILock#getToken()}. */ - void insertLockToken(@Param("updateId") String updateId, @Param("lockToken") String lockToken); + void insertLockToken(@Param("key") IJobUpdateKey key, @Param("lockToken") String lockToken); /** * Inserts a task configuration entry for an update. * - * @param updateId Update ID to insert task configs for. + * @param key Update to insert task configs for. * @param taskConfig task configuration to insert. * @param isNew Flag to identify if the task config is existing {@code false} or * desired {@code true}. * @param result Container for auto-generated ID of the inserted job update row. */ void insertTaskConfig( - @Param("updateId") String updateId, + @Param("key") IJobUpdateKey key, @Param("config") TaskConfig taskConfig, @Param("isNew") boolean isNew, @Param("result") InsertResult result); @@ -80,23 +82,19 @@ interface JobUpdateDetailsMapper { * Maps update with an optional set of * {@link org.apache.aurora.gen.JobUpdateSettings#updateOnlyTheseInstances}. * - * @param updateId Update ID to store overrides for. + * @param key Update to store overrides for. * @param ranges Instance ID ranges to associate with an update. */ - void insertInstanceOverrides( - @Param("updateId") String updateId, - @Param("ranges") Set ranges); + void insertInstanceOverrides(@Param("key") IJobUpdateKey key, @Param("ranges") Set ranges); /** * Maps update with a set of instance IDs in * {@link org.apache.aurora.gen.JobUpdateInstructions#desiredState}. * - * @param updateId Update ID to store desired instances for. + * @param key Update to store desired instances for. * @param ranges Desired instance ID ranges to associate with an update. */ - void insertDesiredInstances( - @Param("updateId") String updateId, - @Param("ranges") Set ranges); + void insertDesiredInstances(@Param("key") IJobUpdateKey key, @Param("ranges") Set ranges); /** * Deletes all updates and events from the database. @@ -104,11 +102,11 @@ interface JobUpdateDetailsMapper { void truncate(); /** - * Deletes all updates and events with update ID in {@code updateIds}. + * Deletes all updates and events with update ID in {@code updates}. * - * @param updateIds Update IDs to delete. + * @param rowIds Row IDs of updates to delete. */ - void deleteCompletedUpdates(@Param("updateIds") Set updateIds); + void deleteCompletedUpdates(@Param("rowIds") Set rowIds); /** * Selects all distinct job key IDs associated with at least {@code perJobRetainCount} completed @@ -116,7 +114,7 @@ interface JobUpdateDetailsMapper { * * @param perJobRetainCount Number of updates to keep per job. * @param historyPruneThresholdMs History pruning timestamp threshold. - * @return Job key IDs. + * @return Job key database row IDs. */ Set selectJobKeysForPruning( @Param("retainCount") int perJobRetainCount, @@ -130,9 +128,9 @@ interface JobUpdateDetailsMapper { * @param jobKeyId Job key ID to select pruning victims for. * @param perJobRetainCount Number of updates to keep per job. * @param historyPruneThresholdMs History pruning timestamp threshold. - * @return Update IDs to prune. + * @return Victims to prune. */ - Set selectPruneVictims( + Set selectPruneVictims( @Param("keyId") long jobKeyId, @Param("retainCount") int perJobRetainCount, @Param("pruneThresholdMs") long historyPruneThresholdMs); @@ -147,13 +145,25 @@ interface JobUpdateDetailsMapper { List selectSummaries(JobUpdateQuery query); /** - * Gets details for the provided {@code updateId}. + * Selects an update key by the update ID field contained within it. + * + *

+ * TODO(wfarner): Remove this in the final phase of AURORA-1093. + * + * @param updateId Update to search by. + * @return The update key that {@code updateId} represents. + */ + @Nullable + JobUpdateKey selectUpdateKey(String updateId); + + /** + * Gets details for the provided {@code key}. * - * @param updateId Update ID to get. + * @param key Update to get. * @return Job update details for the provided update ID, if it exists. */ @Nullable - StoredJobUpdateDetails selectDetails(String updateId); + StoredJobUpdateDetails selectDetails(@Param("key") IJobUpdateKey key); /** * Gets all job update details matching the provided {@code query}. @@ -165,22 +175,22 @@ interface JobUpdateDetailsMapper { List selectDetailsList(JobUpdateQuery query); /** - * Gets job update for the provided {@code updateId}. + * Gets job update for the provided {@code update}. * - * @param updateId Update ID to select by. + * @param key Update to select by. * @return Job update for the provided update ID, if it exists. */ @Nullable - JobUpdate selectUpdate(String updateId); + JobUpdate selectUpdate(@Param("key") IJobUpdateKey key); /** - * Gets job update instructions for the provided {@code updateId}. + * Gets job update instructions for the provided {@code update}. * - * @param updateId Update ID to select by. + * @param key Update to select by. * @return Job update instructions for the provided update ID, if it exists. */ @Nullable - JobUpdateInstructions selectInstructions(String updateId); + JobUpdateInstructions selectInstructions(@Param("key") IJobUpdateKey key); /** * Gets all stored job update details. @@ -192,20 +202,20 @@ interface JobUpdateDetailsMapper { /** * Gets the token associated with an update. * - * @param updateId Update identifier. + * @param key Update identifier. * @return The associated lock token, or {@code null} if no association exists. */ @Nullable - String selectLockToken(String updateId); + String selectLockToken(@Param("key") IJobUpdateKey key); /** * Gets job instance update events for a specific instance within an update. * - * @param updateId Update identifier. + * @param key Update identifier. * @param instanceId Instance to fetch events for. - * @return Instance events affecting {@code instanceId} within {@code updateId}. + * @return Instance events affecting {@code instanceId} within {@code key}. */ List selectInstanceUpdateEvents( - @Param("updateId") String updateId, + @Param("key") IJobUpdateKey key, @Param("instanceId") int instanceId); } http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ec66a5ec/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateEventMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateEventMapper.java b/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateEventMapper.java index bbd2f46..d1a3c3f 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateEventMapper.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateEventMapper.java @@ -14,6 +14,7 @@ package org.apache.aurora.scheduler.storage.db; import org.apache.aurora.gen.JobUpdateEvent; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; import org.apache.ibatis.annotations.Param; /** @@ -26,8 +27,8 @@ interface JobUpdateEventMapper { /** * Inserts a new job update event into the database. * - * @param updateId Update ID of the event. + * @param key ID of the update associated with the event. * @param event Event to insert. */ - void insert(@Param("updateId") String updateId, @Param("event") JobUpdateEvent event); + void insert(@Param("key") IJobUpdateKey key, @Param("event") JobUpdateEvent event); } http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ec66a5ec/src/main/java/org/apache/aurora/scheduler/storage/db/PruneVictim.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/PruneVictim.java b/src/main/java/org/apache/aurora/scheduler/storage/db/PruneVictim.java new file mode 100644 index 0000000..144f5a3 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/PruneVictim.java @@ -0,0 +1,40 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.db; + +import org.apache.aurora.gen.JobUpdateKey; + +/** + * A job update that should be pruned. + */ +public class PruneVictim { + private long rowId; + private JobUpdateKey update; + + public long getRowId() { + return rowId; + } + + public JobUpdateKey getUpdate() { + return update; + } + + public void setRowId(long rowId) { + this.rowId = rowId; + } + + public void setUpdate(JobUpdateKey update) { + this.update = update; + } +} http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ec66a5ec/src/main/resources/org/apache/aurora/scheduler/storage/db/JobInstanceUpdateEventMapper.xml ---------------------------------------------------------------------- diff --git a/src/main/resources/org/apache/aurora/scheduler/storage/db/JobInstanceUpdateEventMapper.xml b/src/main/resources/org/apache/aurora/scheduler/storage/db/JobInstanceUpdateEventMapper.xml index 1bc2a62..1b58406 100644 --- a/src/main/resources/org/apache/aurora/scheduler/storage/db/JobInstanceUpdateEventMapper.xml +++ b/src/main/resources/org/apache/aurora/scheduler/storage/db/JobInstanceUpdateEventMapper.xml @@ -19,16 +19,12 @@ INSERT INTO job_instance_update_events ( - update_id, + update_row_id, action, instance_id, timestamp_ms ) VALUES ( - ( - SELECT ID - FROM job_updates - WHERE update_id = #{updateId} - ), + , #{event.action, typeHandler=org.apache.aurora.scheduler.storage.db.typehandlers.JobUpdateActionTypeHandler}, #{event.instanceId}, #{event.timestampMs} http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ec66a5ec/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml ---------------------------------------------------------------------- diff --git a/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml b/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml index 7fd3a86..7685597 100644 --- a/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml +++ b/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml @@ -17,11 +17,27 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> - + + INNER JOIN job_keys AS j ON j.id = u.job_key_id + + + + u.update_id = #{key.id} + AND j.role = #{key.job.role} + AND j.environment = #{key.job.environment} + AND j.name = #{key.job.name} + + + ( - SELECT id - FROM job_updates - WHERE update_id = #{updateId} + SELECT u.id + FROM job_updates AS u + + + WHERE ) @@ -61,29 +77,29 @@ INSERT INTO job_update_locks ( - update_id, + update_row_id, lock_token ) VALUES ( - , + , #{lockToken} ) INSERT INTO job_update_configs ( - update_id, + update_row_id, task_config, is_new ) VALUES ( - , + , #{config, typeHandler=org.apache.aurora.scheduler.storage.db.typehandlers.TaskConfigTypeHandler}, #{isNew} ) - + - , + , #{element.first}, #{element.last} @@ -104,20 +120,20 @@ INSERT INTO job_updates_to_instance_overrides ( - update_id, + update_row_id, first, last ) VALUES - + INSERT INTO job_updates_to_desired_instances ( - update_id, + update_row_id, first, last ) VALUES - + @@ -205,51 +221,51 @@ INNER JOIN ( SELECT - e_s.update_id, + e_s.update_row_id, e_s.status FROM job_update_events AS e_s INNER JOIN ( SELECT - update_id, + update_row_id, MAX(timestamp_ms) AS timestamp_ms FROM job_update_events - GROUP BY update_id - ) AS e_t ON e_t.update_id = e_s.update_id AND e_t.timestamp_ms = e_s.timestamp_ms - ) AS max_status ON max_status.update_id = u.id + GROUP BY update_row_id + ) AS e_t ON e_t.update_row_id = e_s.update_row_id AND e_t.timestamp_ms = e_s.timestamp_ms + ) AS max_status ON max_status.update_row_id = u.id INNER JOIN ( SELECT - update_id, + update_row_id, MIN(timestamp_ms) AS timestamp_ms FROM job_update_events - GROUP BY update_id - ) AS min_ts ON min_ts.update_id = u.id + GROUP BY update_row_id + ) AS min_ts ON min_ts.update_row_id = u.id INNER JOIN ( SELECT - update_id, + update_row_id, MAX(timestamp_ms) AS timestamp_ms FROM ( SELECT - update_id, + update_row_id, timestamp_ms FROM job_update_events UNION ALL SELECT - update_id, + update_row_id, timestamp_ms FROM job_instance_update_events ) - GROUP BY update_id - ) AS max_ts ON max_ts.update_id = u.id + GROUP BY update_row_id + ) AS max_ts ON max_ts.update_row_id = u.id @@ -258,14 +274,13 @@ - - INNER JOIN job_keys AS j ON j.id = u.job_key_id - - - + WHERE TRUE AND u.update_id = #{updateId} + + AND + AND u.user = #{user} AND j.role = #{role} @@ -304,6 +319,17 @@ + + @@ -344,15 +370,15 @@ - LEFT OUTER JOIN job_update_configs AS cn ON cn.update_id = u.id AND cn.is_new = TRUE - LEFT OUTER JOIN job_update_configs AS co ON co.update_id = u.id AND co.is_new = FALSE + LEFT OUTER JOIN job_update_configs AS cn ON cn.update_row_id = u.id AND cn.is_new = TRUE + LEFT OUTER JOIN job_update_configs AS co ON co.update_row_id = u.id AND co.is_new = FALSE LEFT OUTER JOIN job_update_configs_to_instances AS ci ON ci.config_id = co.id - LEFT OUTER JOIN job_updates_to_desired_instances AS di ON di.update_id = u.id - LEFT OUTER JOIN job_updates_to_instance_overrides AS io ON io.update_id = u.id + LEFT OUTER JOIN job_updates_to_desired_instances AS di ON di.update_row_id = u.id + LEFT OUTER JOIN job_updates_to_instance_overrides AS io ON io.update_row_id = u.id - LEFT OUTER JOIN job_update_locks AS l on l.update_id = u.id + LEFT OUTER JOIN job_update_locks AS l on l.update_row_id = u.id @@ -371,8 +397,8 @@ - LEFT OUTER JOIN job_update_events AS e ON e.update_id = u.id - LEFT OUTER JOIN job_instance_update_events AS i ON i.update_id = u.id + LEFT OUTER JOIN job_update_events AS e ON e.update_row_id = u.id + LEFT OUTER JOIN job_instance_update_events AS i ON i.update_row_id = u.id @@ -407,7 +433,7 @@ FROM job_updates AS u - WHERE u.update_id = #{id} + WHERE @@ -439,8 +465,9 @@ SELECT lock_token FROM job_update_locks AS l - INNER JOIN job_updates u ON l.update_id = u.id - WHERE u.update_id = #{id} + INNER JOIN job_updates u ON l.update_row_id = u.id + + WHERE @@ -477,12 +506,34 @@ WHERE min_ts.timestamp_ms < #{pruneThresholdMs} AND l.id IS NULL - + SELECT + row_id, + u_id, + u_jk_role, + u_jk_environment, + u_jk_name + FROM ( SELECT - u.update_id as id + u.id as row_id, + u.update_id AS u_id, + j.role AS u_jk_role, + j.environment AS u_jk_environment, + j.name AS u_jk_name FROM job_updates as u + WHERE u.job_key_id = #{keyId} @@ -493,8 +544,13 @@ ) UNION SELECT - u.update_id as id + u.id, + u.update_id AS u_id, + j.role AS u_jk_role, + j.environment AS u_jk_environment, + j.name AS u_jk_name FROM job_updates as u + WHERE u.job_key_id = #{keyId} @@ -504,8 +560,8 @@ DELETE FROM job_updates - WHERE update_id IN - + WHERE id IN + #{element} http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ec66a5ec/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateEventMapper.xml ---------------------------------------------------------------------- diff --git a/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateEventMapper.xml b/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateEventMapper.xml index 9f23c4b..d813b19 100644 --- a/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateEventMapper.xml +++ b/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateEventMapper.xml @@ -19,16 +19,12 @@ INSERT INTO job_update_events ( - update_id, + update_row_id, status, user, timestamp_ms ) VALUES ( - ( - SELECT id - FROM job_updates - WHERE update_id = #{updateId} - ), + , #{event.status, typeHandler=org.apache.aurora.scheduler.storage.db.typehandlers.JobUpdateStatusTypeHandler}, #{event.user}, #{event.timestampMs} http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ec66a5ec/src/main/resources/org/apache/aurora/scheduler/storage/db/schema.sql ---------------------------------------------------------------------- diff --git a/src/main/resources/org/apache/aurora/scheduler/storage/db/schema.sql b/src/main/resources/org/apache/aurora/scheduler/storage/db/schema.sql index 987596f..e6184dd 100644 --- a/src/main/resources/org/apache/aurora/scheduler/storage/db/schema.sql +++ b/src/main/resources/org/apache/aurora/scheduler/storage/db/schema.sql @@ -108,41 +108,42 @@ CREATE TABLE job_updates( wait_for_batch_completion BOOLEAN NOT NULL, block_if_no_pulses_after_ms INT NULL, + -- TODO(wfarner): Convert this to UNIQUE(job_key_id, update_id) to complete AURORA-1093. UNIQUE(update_id) ); CREATE TABLE job_update_locks( id IDENTITY, - update_id BIGINT NOT NULL REFERENCES job_updates(id) ON DELETE CASCADE, + update_row_id BIGINT NOT NULL REFERENCES job_updates(id) ON DELETE CASCADE, lock_token VARCHAR NOT NULL REFERENCES locks(token) ON DELETE CASCADE, - UNIQUE(update_id), + UNIQUE(update_row_id), UNIQUE(lock_token) ); CREATE TABLE job_update_configs( id IDENTITY, - update_id BIGINT NOT NULL REFERENCES job_updates(id) ON DELETE CASCADE, + update_row_id BIGINT NOT NULL REFERENCES job_updates(id) ON DELETE CASCADE, task_config BINARY NOT NULL, is_new BOOLEAN NOT NULL ); CREATE TABLE job_updates_to_instance_overrides( id IDENTITY, - update_id BIGINT NOT NULL REFERENCES job_updates(id) ON DELETE CASCADE, + update_row_id BIGINT NOT NULL REFERENCES job_updates(id) ON DELETE CASCADE, first INT NOT NULL, last INT NOT NULL, - UNIQUE(update_id, first, last) + UNIQUE(update_row_id, first, last) ); CREATE TABLE job_updates_to_desired_instances( id IDENTITY, - update_id BIGINT NOT NULL REFERENCES job_updates(id) ON DELETE CASCADE, + update_row_id BIGINT NOT NULL REFERENCES job_updates(id) ON DELETE CASCADE, first INT NOT NULL, last INT NOT NULL, - UNIQUE(update_id, first, last) + UNIQUE(update_row_id, first, last) ); CREATE TABLE job_update_configs_to_instances( @@ -156,7 +157,7 @@ CREATE TABLE job_update_configs_to_instances( CREATE TABLE job_update_events( id IDENTITY, - update_id BIGINT NOT NULL REFERENCES job_updates(id) ON DELETE CASCADE, + update_row_id BIGINT NOT NULL REFERENCES job_updates(id) ON DELETE CASCADE, status INT NOT NULL REFERENCES job_update_statuses(id), timestamp_ms BIGINT NOT NULL, user VARCHAR @@ -164,7 +165,7 @@ CREATE TABLE job_update_events( CREATE TABLE job_instance_update_events( id IDENTITY, - update_id BIGINT NOT NULL REFERENCES job_updates(id) ON DELETE CASCADE, + update_row_id BIGINT NOT NULL REFERENCES job_updates(id) ON DELETE CASCADE, action INT NOT NULL REFERENCES job_instance_update_actions(id), instance_id INT NOT NULL, timestamp_ms BIGINT NOT NULL http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ec66a5ec/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java index be5e772..156cbc4 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java @@ -30,6 +30,7 @@ import org.apache.aurora.gen.JobUpdateAction; import org.apache.aurora.gen.JobUpdateDetails; import org.apache.aurora.gen.JobUpdateEvent; import org.apache.aurora.gen.JobUpdateInstructions; +import org.apache.aurora.gen.JobUpdateKey; import org.apache.aurora.gen.JobUpdateQuery; import org.apache.aurora.gen.JobUpdateSettings; import org.apache.aurora.gen.JobUpdateState; @@ -73,6 +74,7 @@ import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD; import static org.apache.aurora.gen.JobUpdateStatus.ROLL_BACK_PAUSED; import static org.apache.aurora.gen.JobUpdateStatus.ROLL_FORWARD_PAUSED; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; public class DBJobUpdateStoreTest { @@ -109,6 +111,15 @@ public class DBJobUpdateStoreTest { saveUpdate(update2, Optional.absent()); assertUpdate(update2); + + // Colliding update IDs should be forbidden. + IJobUpdate update3 = makeJobUpdate(JobKeys.from("role", "env", "name3"), updateId2); + try { + saveUpdate(update3, Optional.absent()); + fail("Update ID collision should not be allowed"); + } catch (StorageException e) { + // Expected. + } } @Test @@ -622,6 +633,22 @@ public class DBJobUpdateStoreTest { ImmutableList.of(s5), getSummaries(new JobUpdateQuery().setJobKey(job5.newBuilder()))); + // Test querying by update key. + assertEquals( + ImmutableList.of(s5), + getSummaries( + new JobUpdateQuery().setKey(new JobUpdateKey(job5.newBuilder(), s5.getUpdateId())))); + + // Test querying by incorrect update keys. + assertEquals( + ImmutableList.of(), + getSummaries( + new JobUpdateQuery().setKey(new JobUpdateKey(job5.newBuilder(), s4.getUpdateId())))); + assertEquals( + ImmutableList.of(), + getSummaries( + new JobUpdateQuery().setKey(new JobUpdateKey(job4.newBuilder(), s5.getUpdateId())))); + // Test query by user. assertEquals(ImmutableList.of(s2, s1), getSummaries(new JobUpdateQuery().setUser("user")));