Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 91B14200D27 for ; Wed, 25 Oct 2017 08:34:19 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 90881160BDA; Wed, 25 Oct 2017 06:34:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8D797160BF2 for ; Wed, 25 Oct 2017 08:34:17 +0200 (CEST) Received: (qmail 14215 invoked by uid 500); 25 Oct 2017 06:34:16 -0000 Mailing-List: contact commits-help@aurora.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aurora.apache.org Delivered-To: mailing list commits@aurora.apache.org Received: (qmail 14200 invoked by uid 99); 25 Oct 2017 06:34:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Oct 2017 06:34:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7E9D1DFF40; Wed, 25 Oct 2017 06:34:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wfarner@apache.org To: commits@aurora.apache.org Date: Wed, 25 Oct 2017 06:34:18 -0000 Message-Id: <292fb5608f2849de9fed40cd3b45e961@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [4/5] aurora git commit: Exclusively use Map-based in-memory stores for primary storage archived-at: Wed, 25 Oct 2017 06:34:19 -0000 http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java new file mode 100644 index 0000000..d190add --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java @@ -0,0 +1,396 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.aurora.scheduler.storage.mem; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import javax.inject.Inject; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; +import com.google.common.collect.Ordering; +import com.google.common.primitives.Longs; + +import org.apache.aurora.common.base.MorePreconditions; +import org.apache.aurora.common.inject.TimedInterceptor.Timed; +import org.apache.aurora.common.stats.StatsProvider; +import org.apache.aurora.gen.JobInstanceUpdateEvent; +import org.apache.aurora.gen.JobUpdateAction; +import org.apache.aurora.gen.JobUpdateDetails; +import org.apache.aurora.gen.JobUpdateEvent; +import org.apache.aurora.gen.JobUpdateState; +import org.apache.aurora.gen.JobUpdateStatus; +import org.apache.aurora.gen.LockKey; +import org.apache.aurora.gen.storage.StoredJobUpdateDetails; +import org.apache.aurora.scheduler.storage.JobUpdateStore; +import org.apache.aurora.scheduler.storage.LockStore; +import org.apache.aurora.scheduler.storage.Storage.StorageException; +import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; +import org.apache.aurora.scheduler.storage.entities.IJobKey; +import org.apache.aurora.scheduler.storage.entities.IJobUpdate; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary; +import org.apache.aurora.scheduler.storage.entities.ILock; +import org.apache.aurora.scheduler.storage.entities.ILockKey; + +import static java.util.Objects.requireNonNull; + +import static org.apache.aurora.scheduler.storage.Util.jobUpdateActionStatName; +import static org.apache.aurora.scheduler.storage.Util.jobUpdateStatusStatName; + +public class MemJobUpdateStore implements JobUpdateStore.Mutable { + + private static final Ordering REVERSE_LAST_MODIFIED_ORDER = Ordering.natural() + .reverse() + .onResultOf(u -> u.getUpdate().getSummary().getState().getLastModifiedTimestampMs()); + + private final Map updates = Maps.newConcurrentMap(); + private final LockStore lockStore; + private final LoadingCache jobUpdateEventStats; + private final LoadingCache jobUpdateActionStats; + + @Inject + public MemJobUpdateStore(LockStore.Mutable lockStore, StatsProvider statsProvider) { + this.lockStore = lockStore; + this.jobUpdateEventStats = CacheBuilder.newBuilder() + .build(new CacheLoader() { + @Override + public AtomicLong load(JobUpdateStatus status) { + return statsProvider.makeCounter(jobUpdateStatusStatName(status)); + } + }); + for (JobUpdateStatus status : JobUpdateStatus.values()) { + jobUpdateEventStats.getUnchecked(status).get(); + } + this.jobUpdateActionStats = CacheBuilder.newBuilder() + .build(new CacheLoader() { + @Override + public AtomicLong load(JobUpdateAction action) { + return statsProvider.makeCounter(jobUpdateActionStatName(action)); + } + }); + for (JobUpdateAction action : JobUpdateAction.values()) { + jobUpdateActionStats.getUnchecked(action).get(); + } + } + + @Timed("job_update_store_fetch_summaries") + @Override + public synchronized List fetchJobUpdateSummaries(IJobUpdateQuery query) { + return performQuery(query) + .map(u -> u.getUpdate().getSummary()) + .collect(Collectors.toList()); + } + + @Timed("job_update_store_fetch_details_list") + @Override + public synchronized List fetchJobUpdateDetails(IJobUpdateQuery query) { + return performQuery(query).collect(Collectors.toList()); + } + + @Timed("job_update_store_fetch_details") + @Override + public synchronized Optional fetchJobUpdateDetails(IJobUpdateKey key) { + return Optional.fromNullable(updates.get(key)).transform(u -> u.details); + } + + @Timed("job_update_store_fetch_update") + @Override + public synchronized Optional fetchJobUpdate(IJobUpdateKey key) { + return Optional.fromNullable(updates.get(key)).transform(u -> u.details.getUpdate()); + } + + @Timed("job_update_store_fetch_instructions") + @Override + public synchronized Optional fetchJobUpdateInstructions( + IJobUpdateKey key) { + + return Optional.fromNullable(updates.get(key)) + .transform(u -> u.details.getUpdate().getInstructions()); + } + + private void refreshLocks() { + // Simulate database behavior of join performed against locks, used to populate lockToken field. + + ImmutableMap.Builder refreshed = ImmutableMap.builder(); + for (Map.Entry entry : updates.entrySet()) { + IJobUpdateDetails update = entry.getValue().details; + Optional updateLock = entry.getValue().lockToken; + if (updateLock.isPresent()) { + // Determine if token needs to be cleared to reflect lock store state. The token may only + // remain if the lock store token exists and matches. + Optional storedLock = Optional.fromNullable(lockStore.fetchLock(ILockKey.build( + LockKey.job(entry.getKey().getJob().newBuilder()))).map(ILock::getToken).orElse(null)); + if (!storedLock.isPresent() || !updateLock.equals(storedLock)) { + refreshed.put(entry.getKey(), new UpdateAndLock(update, Optional.absent())); + } + } + } + + updates.putAll(refreshed.build()); + } + + @Timed("job_update_store_fetch_all_details") + @Override + public synchronized Set fetchAllJobUpdateDetails() { + refreshLocks(); + return updates.values().stream() + .map(u -> new StoredJobUpdateDetails(u.details.newBuilder(), u.lockToken.orNull())) + .collect(Collectors.toSet()); + } + + @Timed("job_update_store_fetch_instance_events") + @Override + public synchronized List fetchInstanceEvents( + IJobUpdateKey key, + int instanceId) { + + return java.util.Optional.ofNullable(updates.get(key)) + .map(u -> u.details.getInstanceEvents()) + .orElse(ImmutableList.of()) + .stream() + .filter(e -> e.getInstanceId() == instanceId) + .collect(Collectors.toList()); + } + + private static void validateInstructions(IJobUpdateInstructions instructions) { + if (!instructions.isSetDesiredState() && instructions.getInitialState().isEmpty()) { + throw new IllegalArgumentException( + "Missing both initial and desired states. At least one is required."); + } + + if (!instructions.getInitialState().isEmpty()) { + if (instructions.getInitialState().stream().anyMatch(t -> t.getTask() == null)) { + throw new NullPointerException("Invalid initial instance state."); + } + Preconditions.checkArgument( + instructions.getInitialState().stream().noneMatch(t -> t.getInstances().isEmpty()), + "Invalid intial instance state ranges."); + } + + if (instructions.getDesiredState() != null) { + MorePreconditions.checkNotBlank(instructions.getDesiredState().getInstances()); + Preconditions.checkNotNull(instructions.getDesiredState().getTask()); + } + } + + @Timed("job_update_store_save_update") + @Override + public synchronized void saveJobUpdate(IJobUpdate update, Optional lockToken) { + requireNonNull(update); + validateInstructions(update.getInstructions()); + + if (updates.containsKey(update.getSummary().getKey())) { + throw new StorageException("Update already exists: " + update.getSummary().getKey()); + } + + JobUpdateDetails mutable = new JobUpdateDetails() + .setUpdate(update.newBuilder()) + .setUpdateEvents(ImmutableList.of()) + .setInstanceEvents(ImmutableList.of()); + mutable.getUpdate().getSummary().setState(synthesizeUpdateState(mutable)); + + updates.put( + update.getSummary().getKey(), + new UpdateAndLock(IJobUpdateDetails.build(mutable), lockToken)); + } + + private static final Ordering EVENT_ORDERING = Ordering.natural() + .onResultOf(JobUpdateEvent::getTimestampMs); + + @Timed("job_update_store_save_event") + @Override + public synchronized void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event) { + UpdateAndLock update = updates.get(key); + if (update == null) { + throw new StorageException("Update not found: " + key); + } + + JobUpdateDetails mutable = update.details.newBuilder(); + mutable.addToUpdateEvents(event.newBuilder()); + mutable.setUpdateEvents(EVENT_ORDERING.sortedCopy(mutable.getUpdateEvents())); + mutable.getUpdate().getSummary().setState(synthesizeUpdateState(mutable)); + updates.put(key, new UpdateAndLock(IJobUpdateDetails.build(mutable), update.lockToken)); + jobUpdateEventStats.getUnchecked(event.getStatus()).incrementAndGet(); + } + + private static final Ordering INSTANCE_EVENT_ORDERING = Ordering.natural() + .onResultOf(JobInstanceUpdateEvent::getTimestampMs); + + @Timed("job_update_store_save_instance_event") + @Override + public synchronized void saveJobInstanceUpdateEvent( + IJobUpdateKey key, + IJobInstanceUpdateEvent event) { + + UpdateAndLock update = updates.get(key); + if (update == null) { + throw new StorageException("Update not found: " + key); + } + + JobUpdateDetails mutable = update.details.newBuilder(); + mutable.addToInstanceEvents(event.newBuilder()); + mutable.setInstanceEvents(INSTANCE_EVENT_ORDERING.sortedCopy(mutable.getInstanceEvents())); + mutable.getUpdate().getSummary().setState(synthesizeUpdateState(mutable)); + updates.put(key, new UpdateAndLock(IJobUpdateDetails.build(mutable), update.lockToken)); + jobUpdateActionStats.getUnchecked(event.getAction()).incrementAndGet(); + } + + @Timed("job_update_store_delete_all") + @Override + public synchronized void deleteAllUpdatesAndEvents() { + updates.clear(); + } + + @Timed("job_update_store_prune_history") + @Override + public synchronized Set pruneHistory( + int perJobRetainCount, + long historyPruneThresholdMs) { + + Supplier> completedUpdates = () -> updates.values().stream() + .map(u -> u.details.getUpdate().getSummary()) + .filter(s -> TERMINAL_STATES.contains(s.getState().getStatus())); + + Predicate expiredFilter = + s -> s.getState().getCreatedTimestampMs() < historyPruneThresholdMs; + + ImmutableSet.Builder pruneBuilder = ImmutableSet.builder(); + + // Gather updates based on time threshold. + pruneBuilder.addAll(completedUpdates.get() + .filter(expiredFilter) + .map(IJobUpdateSummary::getKey) + .collect(Collectors.toList())); + + Multimap updatesByJob = Multimaps.index( + // Avoid counting to-be-removed expired updates. + completedUpdates.get().filter(expiredFilter.negate()).iterator(), + s -> s.getKey().getJob()); + + for (Map.Entry> entry + : updatesByJob.asMap().entrySet()) { + + if (entry.getValue().size() > perJobRetainCount) { + Ordering creationOrder = Ordering.natural() + .onResultOf(s -> s.getState().getCreatedTimestampMs()); + pruneBuilder.addAll(creationOrder + .leastOf(entry.getValue(), entry.getValue().size() - perJobRetainCount) + .stream() + .map(s -> s.getKey()) + .iterator()); + } + } + + Set pruned = pruneBuilder.build(); + updates.keySet().removeAll(pruned); + + return pruned; + } + + private static JobUpdateState synthesizeUpdateState(JobUpdateDetails update) { + JobUpdateState state = update.getUpdate().getSummary().getState(); + if (state == null) { + state = new JobUpdateState(); + } + + JobUpdateEvent firstEvent = Iterables.getFirst(update.getUpdateEvents(), null); + if (firstEvent != null) { + state.setCreatedTimestampMs(firstEvent.getTimestampMs()); + } + + JobUpdateEvent lastEvent = Iterables.getLast(update.getUpdateEvents(), null); + if (lastEvent != null) { + state.setStatus(lastEvent.getStatus()); + state.setLastModifiedTimestampMs(lastEvent.getTimestampMs()); + } + + JobInstanceUpdateEvent lastInstanceEvent = Iterables.getLast(update.getInstanceEvents(), null); + if (lastInstanceEvent != null) { + state.setLastModifiedTimestampMs( + Longs.max(state.getLastModifiedTimestampMs(), lastInstanceEvent.getTimestampMs())); + } + + return state; + } + + private Stream performQuery(IJobUpdateQuery query) { + Predicate filter = u -> true; + if (query.getRole() != null) { + filter = filter.and( + u -> u.getUpdate().getSummary().getKey().getJob().getRole().equals(query.getRole())); + } + if (query.getKey() != null) { + filter = filter.and(u -> u.getUpdate().getSummary().getKey().equals(query.getKey())); + } + if (query.getJobKey() != null) { + filter = filter.and( + u -> u.getUpdate().getSummary().getKey().getJob().equals(query.getJobKey())); + } + if (query.getUser() != null) { + filter = filter.and(u -> u.getUpdate().getSummary().getUser().equals(query.getUser())); + } + if (query.getUpdateStatuses() != null && !query.getUpdateStatuses().isEmpty()) { + filter = filter.and(u -> query.getUpdateStatuses() + .contains(u.getUpdate().getSummary().getState().getStatus())); + } + + // TODO(wfarner): Modification time is not a stable ordering for pagination, but we use it as + // such here. The behavior is carried over from DbJobupdateStore; determine if it is desired. + Stream matches = updates.values().stream() + .map(u -> u.details) + .filter(filter) + .sorted(REVERSE_LAST_MODIFIED_ORDER) + .skip(query.getOffset()); + + if (query.getLimit() > 0) { + matches = matches.limit(query.getLimit()); + } + + return matches; + } + + private static final class UpdateAndLock { + private final IJobUpdateDetails details; + private final Optional lockToken; + + UpdateAndLock(IJobUpdateDetails details, Optional lockToken) { + this.details = details; + this.lockToken = lockToken; + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/main/java/org/apache/aurora/scheduler/storage/mem/MemLockStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemLockStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemLockStore.java new file mode 100644 index 0000000..4c7bda8 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemLockStore.java @@ -0,0 +1,72 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.mem; + +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import com.google.common.base.Predicates; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; + +import org.apache.aurora.scheduler.storage.LockStore; +import org.apache.aurora.scheduler.storage.Storage.StorageException; +import org.apache.aurora.scheduler.storage.entities.ILock; +import org.apache.aurora.scheduler.storage.entities.ILockKey; + +/** + * An in-memory lock store. + */ +class MemLockStore implements LockStore.Mutable { + + private final Map locks = Maps.newConcurrentMap(); + + @Override + public void saveLock(ILock lock) { + // TODO(wfarner): Re-evaluate, this is not idempotent. + if (locks.containsKey(lock.getKey())) { + throw new StorageException("Duplicate lock key"); + } + if (FluentIterable.from(locks.values()) + .transform(ILock::getToken) + .anyMatch(Predicates.equalTo(lock.getToken()))) { + + throw new StorageException("Duplicate token"); + } + + locks.put(lock.getKey(), lock); + } + + @Override + public void removeLock(ILockKey lockKey) { + locks.remove(lockKey); + } + + @Override + public void deleteLocks() { + locks.clear(); + } + + @Override + public Set fetchLocks() { + return ImmutableSet.copyOf(locks.values()); + } + + @Override + public Optional fetchLock(ILockKey lockKey) { + return Optional.ofNullable(locks.get(lockKey)); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/main/java/org/apache/aurora/scheduler/storage/mem/MemQuotaStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemQuotaStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemQuotaStore.java new file mode 100644 index 0000000..61b9eee --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemQuotaStore.java @@ -0,0 +1,56 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.mem; + +import java.util.Map; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; + +import org.apache.aurora.scheduler.storage.QuotaStore; +import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; + +/** + * An in-memory quota store. + */ +class MemQuotaStore implements QuotaStore.Mutable { + + private final Map quotas = Maps.newConcurrentMap(); + + @Override + public void deleteQuotas() { + quotas.clear(); + } + + @Override + public void removeQuota(String role) { + quotas.remove(role); + } + + @Override + public void saveQuota(String role, IResourceAggregate quota) { + quotas.put(role, quota); + } + + @Override + public Optional fetchQuota(String role) { + return Optional.fromNullable(quotas.get(role)); + } + + @Override + public Map fetchQuotas() { + return ImmutableMap.copyOf(quotas); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/main/java/org/apache/aurora/scheduler/storage/mem/MemSchedulerStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemSchedulerStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemSchedulerStore.java new file mode 100644 index 0000000..8a7ddd7 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemSchedulerStore.java @@ -0,0 +1,38 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.mem; + +import java.util.concurrent.atomic.AtomicReference; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.Atomics; + +import org.apache.aurora.scheduler.storage.SchedulerStore; + +/** + * An in-memory scheduler store. + */ +class MemSchedulerStore implements SchedulerStore.Mutable { + private final AtomicReference frameworkId = Atomics.newReference(); + + @Override + public void saveFrameworkId(String newFrameworkId) { + frameworkId.set(newFrameworkId); + } + + @Override + public Optional fetchFrameworkId() { + return Optional.fromNullable(frameworkId.get()); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java new file mode 100644 index 0000000..7ace104 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java @@ -0,0 +1,103 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.mem; + +import javax.inject.Inject; + +import org.apache.aurora.common.inject.TimedInterceptor.Timed; +import org.apache.aurora.scheduler.storage.AttributeStore; +import org.apache.aurora.scheduler.storage.CronJobStore; +import org.apache.aurora.scheduler.storage.JobUpdateStore; +import org.apache.aurora.scheduler.storage.LockStore; +import org.apache.aurora.scheduler.storage.QuotaStore; +import org.apache.aurora.scheduler.storage.SchedulerStore; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.TaskStore; + +/** + * A storage implementation comprised of individual in-memory store implementations. + */ +public class MemStorage implements Storage { + private final MutableStoreProvider storeProvider; + + @Inject + MemStorage( + @Volatile final SchedulerStore.Mutable schedulerStore, + @Volatile final CronJobStore.Mutable jobStore, + @Volatile final TaskStore.Mutable taskStore, + @Volatile final LockStore.Mutable lockStore, + @Volatile final QuotaStore.Mutable quotaStore, + @Volatile final AttributeStore.Mutable attributeStore, + @Volatile final JobUpdateStore.Mutable updateStore) { + + storeProvider = new MutableStoreProvider() { + @Override + public SchedulerStore.Mutable getSchedulerStore() { + return schedulerStore; + } + + @Override + public CronJobStore.Mutable getCronJobStore() { + return jobStore; + } + + @Override + public TaskStore getTaskStore() { + return taskStore; + } + + @Override + public TaskStore.Mutable getUnsafeTaskStore() { + return taskStore; + } + + @Override + public LockStore.Mutable getLockStore() { + return lockStore; + } + + @Override + public QuotaStore.Mutable getQuotaStore() { + return quotaStore; + } + + @Override + public AttributeStore.Mutable getAttributeStore() { + return attributeStore; + } + + @Override + public JobUpdateStore.Mutable getJobUpdateStore() { + return updateStore; + } + }; + } + + @Timed("mem_storage_read_operation") + @Override + public T read(final Work work) throws StorageException, E { + return work.apply(storeProvider); + } + + @Timed("mem_storage_write_operation") + @Override + public T write(final MutateWork work) throws StorageException, E { + return work.apply(storeProvider); + } + + @Override + public void prepare() { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java new file mode 100644 index 0000000..2ad84eb --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java @@ -0,0 +1,107 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.mem; + +import javax.inject.Singleton; + +import com.google.common.annotations.VisibleForTesting; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.PrivateModule; +import com.google.inject.TypeLiteral; + +import org.apache.aurora.common.inject.Bindings.KeyFactory; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.stats.StatsProvider; +import org.apache.aurora.scheduler.storage.AttributeStore; +import org.apache.aurora.scheduler.storage.CronJobStore; +import org.apache.aurora.scheduler.storage.JobUpdateStore; +import org.apache.aurora.scheduler.storage.LockStore; +import org.apache.aurora.scheduler.storage.QuotaStore; +import org.apache.aurora.scheduler.storage.SchedulerStore; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.Storage.Volatile; +import org.apache.aurora.scheduler.storage.TaskStore; +import org.apache.aurora.scheduler.storage.mem.MemTaskStore.SlowQueryThreshold; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; + +import static java.util.Objects.requireNonNull; + +/** + * Binding module for in-memory stores. + *

+ * NOTE: These stores are being phased out in favor of database-backed stores. + */ +public final class MemStorageModule extends PrivateModule { + + private final KeyFactory keyFactory; + + public MemStorageModule() { + this(KeyFactory.PLAIN); + } + + public MemStorageModule(KeyFactory keyFactory) { + this.keyFactory = requireNonNull(keyFactory); + } + + private void bindStore(Class binding, Class impl) { + bind(binding).to(impl); + bind(impl).in(Singleton.class); + Key key = Key.get(binding, Volatile.class); + bind(key).to(impl); + expose(key); + expose(binding); + } + + @Override + protected void configure() { + bind(new TypeLiteral>() { }).annotatedWith(SlowQueryThreshold.class) + .toInstance(Amount.of(25L, Time.MILLISECONDS)); + bindStore(TaskStore.Mutable.class, MemTaskStore.class); + bindStore(CronJobStore.Mutable.class, MemCronJobStore.class); + bindStore(AttributeStore.Mutable.class, MemAttributeStore.class); + bindStore(LockStore.Mutable.class, MemLockStore.class); + bindStore(QuotaStore.Mutable.class, MemQuotaStore.class); + bindStore(SchedulerStore.Mutable.class, MemSchedulerStore.class); + bindStore(JobUpdateStore.Mutable.class, MemJobUpdateStore.class); + + Key storageKey = keyFactory.create(Storage.class); + bind(storageKey).to(MemStorage.class); + bind(MemStorage.class).in(Singleton.class); + expose(storageKey); + } + + /** + * Creates a new empty in-memory storage for use in testing. + */ + @VisibleForTesting + public static Storage newEmptyStorage() { + Injector injector = Guice.createInjector( + new MemStorageModule(), + new AbstractModule() { + @Override + protected void configure() { + bind(StatsProvider.class).to(FakeStatsProvider.class); + bind(FakeStatsProvider.class).in(Singleton.class); + } + }); + + Storage storage = injector.getInstance(Storage.class); + storage.prepare(); + return storage; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/main/java/org/apache/aurora/scheduler/storage/mem/Util.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/Util.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/Util.java deleted file mode 100644 index c28fb65..0000000 --- a/src/main/java/org/apache/aurora/scheduler/storage/mem/Util.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.storage.mem; - -import com.google.common.base.Function; - -import org.apache.thrift.TBase; - -/** - * Utility class for common operations amongst in-memory store implementations. - */ -final class Util { - - private Util() { - // Utility class. - } - - /** - * Creates a function that performs a 'deep copy' on a thrift struct of a specific type. The - * resulting copied objects will be exactly identical to the original. Mutations to the original - * object will not be reflected in the copy, and vice versa. - * - * @return A copier for the provided type of thrift structs. - */ - static > Function deepCopier() { - return input -> { - if (input == null) { - return null; - } - - @SuppressWarnings("unchecked") - T t = (T) input.deepCopy(); - return t; - }; - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java index 3d35e97..27c0b43 100644 --- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java @@ -77,8 +77,6 @@ import static com.google.common.base.Preconditions.checkState; import static org.apache.aurora.gen.JobUpdateStatus.ABORTED; import static org.apache.aurora.gen.JobUpdateStatus.ERROR; -import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_BACK; -import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_FORWARD; import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_BACK; import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD; import static org.apache.aurora.gen.JobUpdateStatus.ROLL_FORWARD_AWAITING_PULSE; @@ -470,14 +468,6 @@ class JobUpdateControllerImpl implements JobUpdateController { changeJobUpdateStatus(storeProvider, key, event, true); } - private static final Set TERMINAL_STATES = ImmutableSet.of( - ROLLED_FORWARD, - ROLLED_BACK, - ABORTED, - JobUpdateStatus.FAILED, - ERROR - ); - private void changeJobUpdateStatus( MutableStoreProvider storeProvider, IJobUpdateKey key, @@ -494,7 +484,7 @@ class JobUpdateControllerImpl implements JobUpdateController { IJobUpdateEvent.build(proposedEvent.setTimestampMs(clock.nowMillis()).setStatus(status))); } - if (TERMINAL_STATES.contains(status)) { + if (JobUpdateStore.TERMINAL_STATES.contains(status)) { lockManager.releaseLock(key.getJob()); pulseHandler.remove(key); } else { http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java index efbc42c..67a0d5a 100644 --- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java +++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java @@ -218,7 +218,7 @@ public class SchedulerIT extends BaseZooKeeperTest { ImmutableList.builder() .add(SchedulerMain.getUniversalModule(new CliOptions())) .add(new TierModule(TaskTestUtil.TIER_CONFIG)) - .add(new LogStorageModule(new LogStorageModule.Options(), false)) + .add(new LogStorageModule(new LogStorageModule.Options())) .add(new ServiceDiscoveryModule(zkClientConfig, SERVERSET_PATH)) .add(testModule) .build() http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java b/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java index 3dee55a..c639ab6 100644 --- a/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java +++ b/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java @@ -15,18 +15,15 @@ package org.apache.aurora.scheduler.app.local; import java.io.File; import java.util.List; -import java.util.Set; import javax.inject.Singleton; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; import com.google.common.io.Files; import com.google.inject.AbstractModule; import com.google.inject.Key; import com.google.inject.Module; -import com.google.inject.TypeLiteral; import com.google.inject.util.Modules; import org.apache.aurora.scheduler.TierModule; @@ -40,8 +37,6 @@ import org.apache.aurora.scheduler.mesos.FrameworkInfoFactory; import org.apache.aurora.scheduler.storage.DistributedSnapshotStore; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage; -import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl; -import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl.HydrateSnapshotFields; import org.apache.mesos.SchedulerDriver; import org.apache.mesos.v1.Protos; import org.apache.shiro.io.ResourceUtils; @@ -79,16 +74,12 @@ public final class LocalSchedulerMain { .add("-shiro_ini_path=" + ResourceUtils.CLASSPATH_PREFIX + "org/apache/aurora/scheduler/http/api/security/shiro-example.ini") - .add("-enable_h2_console=true") .build(); CliOptions options = CommandLine.parseOptions(arguments.toArray(new String[] {})); Module persistentStorage = new AbstractModule() { @Override protected void configure() { - bind(new TypeLiteral() { }) - .annotatedWith(SnapshotStoreImpl.ExperimentalTaskStore.class) - .toInstance(false); bind(Storage.class).to(Key.get(Storage.class, Storage.Volatile.class)); bind(NonVolatileStorage.class).to(FakeNonVolatileStorage.class); bind(DistributedSnapshotStore.class).toInstance(snapshot -> { }); @@ -108,8 +99,6 @@ public final class LocalSchedulerMain { bind(FrameworkInfoFactory.class).to(FrameworkInfoFactory.FrameworkInfoFactoryImpl.class); bind(FrameworkInfoFactory.FrameworkInfoFactoryImpl.class).in(Singleton.class); install(new ClusterSimulatorModule()); - bind(new TypeLiteral>() { }).annotatedWith(HydrateSnapshotFields.class) - .toInstance(ImmutableSet.of()); } }; http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java index f7c945d..244422c 100644 --- a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java +++ b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java @@ -29,7 +29,6 @@ import com.google.common.base.Predicates; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.inject.AbstractModule; @@ -170,17 +169,9 @@ public class CommandLineTest { expected.updater.enableAffinity = true; expected.updater.affinityExpiration = TEST_TIME; expected.state.taskAssignerModules = ImmutableList.of(NoopModule.class); - expected.db.useDbTaskStore = true; - expected.db.enableDbMetrics = false; - expected.db.slowQueryLogThreshold = TEST_TIME; - expected.db.dbRowGcInterval = TEST_TIME; - expected.db.h2LockTimeout = TEST_TIME; - expected.db.mybatisMaxActiveConnectionCount = 42; - expected.db.mybatisMaxIdleConnectionCount = 42; expected.logStorage.shutdownGracePeriod = TEST_TIME; expected.logStorage.snapshotInterval = TEST_TIME; expected.logStorage.maxLogEntrySize = TEST_DATA; - expected.logStorage.hydrateSnapshotFields = ImmutableSet.of("testing"); expected.backup.backupInterval = TEST_TIME; expected.backup.maxSavedBackups = 42; expected.backup.backupDir = new File("testing"); @@ -225,7 +216,6 @@ public class CommandLineTest { expected.iniShiroRealm.shiroIniPath = testIni; expected.iniShiroRealm.shiroCredentialsMatcher = AllowAllCredentialsMatcher.class; expected.api.enableCorsFor = "testing"; - expected.h2Console.enableH2Console = true; expected.preemptor.enablePreemptor = false; expected.preemptor.preemptionDelay = TEST_TIME; expected.preemptor.preemptionSlotHoldTime = TEST_TIME; @@ -323,17 +313,9 @@ public class CommandLineTest { "-enable_update_affinity=true", "-update_affinity_reservation_hold_time=42days", "-task_assigner_modules=org.apache.aurora.scheduler.config.CommandLineTest$NoopModule", - "-use_beta_db_task_store=true", - "-enable_db_metrics=false", - "-slow_query_log_threshold=42days", - "-db_row_gc_interval=42days", - "-db_lock_timeout=42days", - "-db_max_active_connection_count=42", - "-db_max_idle_connection_count=42", "-dlog_shutdown_grace_period=42days", "-dlog_snapshot_interval=42days", "-dlog_max_entry_size=42GB", - "-snapshot_hydrate_stores=testing", "-backup_interval=42days", "-max_saved_backups=42", "-backup_dir=testing", @@ -366,7 +348,6 @@ public class CommandLineTest { "-shiro_credentials_matcher=" + "org.apache.shiro.authc.credential.AllowAllCredentialsMatcher", "-enable_cors_for=testing", - "-enable_h2_console=true", "-enable_preemptor=false", "-preemption_delay=42days", "-preemption_slot_hold_time=42days", http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java index 2b81707..742a97c 100644 --- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java +++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java @@ -33,8 +33,8 @@ import org.apache.aurora.scheduler.state.StateChangeResult; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; -import org.apache.aurora.scheduler.storage.db.DbUtil; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.mem.MemStorageModule; import org.easymock.Capture; import org.junit.Before; import org.junit.Test; @@ -65,7 +65,7 @@ public class AuroraCronJobTest extends EasyMockTest { @Before public void setUp() throws Exception { - storage = DbUtil.createStorage(); + storage = MemStorageModule.newEmptyStorage(); stateManager = createMock(StateManager.class); backoffHelper = createMock(BackoffHelper.class); context = createMock(JobExecutionContext.class); @@ -101,11 +101,11 @@ public class AuroraCronJobTest extends EasyMockTest { populateStorage(CronCollisionPolicy.CANCEL_NEW); auroraCronJob.doExecute(context); - storage = DbUtil.createStorage(); + storage = MemStorageModule.newEmptyStorage(); populateStorage(CronCollisionPolicy.KILL_EXISTING); auroraCronJob.doExecute(context); - storage = DbUtil.createStorage(); + storage = MemStorageModule.newEmptyStorage(); populateStorage(CronCollisionPolicy.RUN_OVERLAP); auroraCronJob.doExecute(context); } http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java index 1ed6a7b..0fabb33 100644 --- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java +++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java @@ -39,9 +39,9 @@ import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; -import org.apache.aurora.scheduler.storage.db.DbUtil; import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; import org.apache.aurora.scheduler.storage.entities.IJobKey; +import org.apache.aurora.scheduler.storage.mem.MemStorageModule; import org.junit.Before; import org.junit.Test; import org.quartz.JobExecutionContext; @@ -79,7 +79,7 @@ public class CronIT extends EasyMockTest { @Before public void setUp() throws Exception { stateManager = createMock(StateManager.class); - storage = DbUtil.createStorage(); + storage = MemStorageModule.newEmptyStorage(); auroraCronJob = createMock(AuroraCronJob.class); injector = Guice.createInjector( http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java index 81440f5..439cf3e 100644 --- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java @@ -31,9 +31,9 @@ import org.apache.aurora.scheduler.cron.CrontabEntry; import org.apache.aurora.scheduler.cron.SanitizedCronJob; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; -import org.apache.aurora.scheduler.storage.db.DbUtil; import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; import org.apache.aurora.scheduler.storage.entities.IJobKey; +import org.apache.aurora.scheduler.storage.mem.MemStorageModule; import org.easymock.EasyMock; import org.junit.Before; import org.junit.Test; @@ -59,7 +59,7 @@ public class CronJobManagerImplTest extends EasyMockTest { @Before public void setUp() { - storage = DbUtil.createStorage(); + storage = MemStorageModule.newEmptyStorage(); scheduler = createMock(Scheduler.class); cronJobManager = new CronJobManagerImpl(storage, scheduler, TimeZone.getTimeZone("GMT")); http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/http/MaintenanceTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/http/MaintenanceTest.java b/src/test/java/org/apache/aurora/scheduler/http/MaintenanceTest.java index f94b58b..cb037bd 100644 --- a/src/test/java/org/apache/aurora/scheduler/http/MaintenanceTest.java +++ b/src/test/java/org/apache/aurora/scheduler/http/MaintenanceTest.java @@ -26,9 +26,9 @@ import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.scheduler.base.TaskTestUtil; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet; -import org.apache.aurora.scheduler.storage.db.DbUtil; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.mem.MemStorageModule; import org.junit.Before; import org.junit.Test; @@ -44,7 +44,7 @@ public class MaintenanceTest { @Before public void setUp() { - storage = DbUtil.createStorage(); + storage = MemStorageModule.newEmptyStorage(); maintenance = new Maintenance(storage); } http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java index af6bdff..7a4525a 100644 --- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java @@ -51,9 +51,9 @@ import org.apache.aurora.scheduler.state.PubsubTestUtil; import org.apache.aurora.scheduler.state.TaskAssigner; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; -import org.apache.aurora.scheduler.storage.db.DbUtil; import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.mem.MemStorageModule; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; import org.apache.aurora.scheduler.testing.FakeStatsProvider; import org.easymock.EasyMock; @@ -310,7 +310,7 @@ public class TaskSchedulerImplTest extends EasyMockTest { @Test public void testIgnoresThrottledTasks() throws Exception { // Ensures that tasks in THROTTLED state are not considered part of the active job state. - Storage memStorage = DbUtil.createStorage(); + Storage memStorage = MemStorageModule.newEmptyStorage(); Injector injector = getInjector(memStorage); scheduler = injector.getInstance(TaskScheduler.class); http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java index c56b6f9..8e19794 100644 --- a/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java @@ -23,9 +23,9 @@ import org.apache.aurora.gen.Lock; import org.apache.aurora.gen.LockKey; import org.apache.aurora.scheduler.base.JobKeys; import org.apache.aurora.scheduler.state.LockManager.LockException; -import org.apache.aurora.scheduler.storage.db.DbUtil; import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.ILock; +import org.apache.aurora.scheduler.storage.mem.MemStorageModule; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -55,7 +55,7 @@ public class LockManagerImplTest extends EasyMockTest { UUIDGenerator tokenGenerator = createMock(UUIDGenerator.class); expect(tokenGenerator.createNew()).andReturn(TOKEN).anyTimes(); - lockManager = new LockManagerImpl(DbUtil.createStorage(), clock, tokenGenerator); + lockManager = new LockManagerImpl(MemStorageModule.newEmptyStorage(), clock, tokenGenerator); } @Test http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java index 24176fe..0366cd6 100644 --- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java @@ -51,11 +51,11 @@ import org.apache.aurora.scheduler.scheduling.RescheduleCalculator; import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; -import org.apache.aurora.scheduler.storage.db.DbUtil; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import org.apache.aurora.scheduler.storage.mem.MemStorageModule; import org.apache.mesos.v1.Protos.AgentID; import org.easymock.Capture; import org.easymock.EasyMock; @@ -111,7 +111,7 @@ public class StateManagerImplTest extends EasyMockTest { eventSink = createMock(EventSink.class); rescheduleCalculator = createMock(RescheduleCalculator.class); // TODO(William Farner): Use a mocked storage. - storage = DbUtil.createStorage(); + storage = MemStorageModule.newEmptyStorage(); stateManager = new StateManagerImpl( clock, driver, http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java b/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java index d73656d..04350fa 100644 --- a/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java +++ b/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java @@ -35,10 +35,10 @@ import org.apache.aurora.scheduler.resources.ResourceBag; import org.apache.aurora.scheduler.resources.ResourceTestUtil; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; -import org.apache.aurora.scheduler.storage.db.DbUtil; import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import org.apache.aurora.scheduler.storage.mem.MemStorageModule; import org.junit.Before; import org.junit.Test; @@ -74,7 +74,7 @@ public class ResourceCounterTest { @Before public void setUp() throws Exception { - storage = DbUtil.createStorage(); + storage = MemStorageModule.newEmptyStorage(); resourceCounter = new ResourceCounter(storage); } http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/storage/AbstractAttributeStoreTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/AbstractAttributeStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/AbstractAttributeStoreTest.java new file mode 100644 index 0000000..34db54b --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/storage/AbstractAttributeStoreTest.java @@ -0,0 +1,176 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage; + +import java.io.IOException; +import java.util.Set; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableSet; + +import org.apache.aurora.gen.Attribute; +import org.apache.aurora.gen.HostAttributes; +import org.apache.aurora.gen.MaintenanceMode; +import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.scheduler.base.JobKeys; +import org.apache.aurora.scheduler.base.TaskTestUtil; +import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.aurora.gen.MaintenanceMode.DRAINED; +import static org.junit.Assert.assertEquals; + +public abstract class AbstractAttributeStoreTest { + + private static final String HOST_A = "hostA"; + private static final String HOST_B = "hostB"; + private static final String SLAVE_A = "slaveA"; + private static final String SLAVE_B = "slaveB"; + private static final Attribute ATTR1 = new Attribute("attr1", ImmutableSet.of("a", "b", "c")); + private static final Attribute ATTR2 = new Attribute("attr2", ImmutableSet.of("d", "e", "f")); + private static final Attribute ATTR3 = new Attribute("attr3", ImmutableSet.of("a", "d", "g")); + private static final IHostAttributes HOST_A_ATTRS = + IHostAttributes.build(new HostAttributes(HOST_A, ImmutableSet.of(ATTR1, ATTR2)) + .setSlaveId(SLAVE_A) + .setAttributes(ImmutableSet.of()) + .setMode(MaintenanceMode.NONE)); + private static final IHostAttributes HOST_B_ATTRS = + IHostAttributes.build(new HostAttributes(HOST_B, ImmutableSet.of(ATTR2, ATTR3)) + .setSlaveId(SLAVE_B) + .setAttributes(ImmutableSet.of()) + .setMode(MaintenanceMode.DRAINING)); + + private Storage storage; + + @Before + public void setUp() throws IOException { + storage = createStorage(); + } + + protected abstract Storage createStorage(); + + @Test + public void testCrud() { + assertEquals(Optional.absent(), read(HOST_A)); + assertEquals(ImmutableSet.of(), readAll()); + + insert(HOST_A_ATTRS); + assertEquals(Optional.of(HOST_A_ATTRS), read(HOST_A)); + assertEquals(ImmutableSet.of(HOST_A_ATTRS), readAll()); + + insert(HOST_B_ATTRS); + insert(HOST_B_ATTRS); // Double insert should be allowed. + assertEquals(Optional.of(HOST_B_ATTRS), read(HOST_B)); + assertEquals(ImmutableSet.of(HOST_A_ATTRS, HOST_B_ATTRS), readAll()); + + IHostAttributes updatedA = IHostAttributes.build( + HOST_A_ATTRS.newBuilder().setAttributes(ImmutableSet.of(ATTR1, ATTR3))); + insert(updatedA); + assertEquals(Optional.of(updatedA), read(HOST_A)); + assertEquals(ImmutableSet.of(updatedA, HOST_B_ATTRS), readAll()); + + IHostAttributes updatedMode = IHostAttributes.build(updatedA.newBuilder().setMode(DRAINED)); + insert(updatedMode); + assertEquals(Optional.of(updatedMode), read(HOST_A)); + assertEquals(ImmutableSet.of(updatedMode, HOST_B_ATTRS), readAll()); + + truncate(); + assertEquals(Optional.absent(), read(HOST_A)); + assertEquals(ImmutableSet.of(), readAll()); + } + + @Test(expected = IllegalArgumentException.class) + public void testEmptyAttributeValues() { + IHostAttributes attributes = IHostAttributes.build(HOST_A_ATTRS.newBuilder() + .setAttributes(ImmutableSet.of(new Attribute("attr1", ImmutableSet.of())))); + insert(attributes); + } + + @Test + public void testNoAttributes() { + IHostAttributes attributes = IHostAttributes.build( + HOST_A_ATTRS.newBuilder().setAttributes(ImmutableSet.of())); + insert(attributes); + assertEquals(Optional.of(attributes), read(HOST_A)); + } + + @Test(expected = IllegalArgumentException.class) + public void testNoMode() { + HostAttributes noMode = HOST_A_ATTRS.newBuilder(); + noMode.unsetMode(); + + insert(IHostAttributes.build(noMode)); + } + + @Test + public void testSaveAttributesEmpty() { + HostAttributes attributes = HOST_A_ATTRS.newBuilder(); + attributes.unsetAttributes(); + + insert(IHostAttributes.build(attributes)); + assertEquals(Optional.of(IHostAttributes.build(attributes)), read(HOST_A)); + } + + @Test + public void testSlaveIdChanges() { + insert(HOST_A_ATTRS); + IHostAttributes updated = IHostAttributes.build(HOST_A_ATTRS.newBuilder().setSlaveId(SLAVE_B)); + insert(updated); + assertEquals(Optional.of(updated), read(HOST_A)); + } + + @Test + public void testUpdateAttributesWithRelations() { + // Test for regression of AURORA-1379, where host attribute mutation performed a delete, + // violating foreign key constraints. + insert(HOST_A_ATTRS); + + ScheduledTask builder = TaskTestUtil.makeTask("a", JobKeys.from("role", "env", "job")) + .newBuilder(); + builder.getAssignedTask() + .setSlaveHost(HOST_A_ATTRS.getHost()) + .setSlaveId(HOST_A_ATTRS.getSlaveId()); + final IScheduledTask taskA = IScheduledTask.build(builder); + + storage.write((NoResult.Quiet) + storeProvider -> storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(taskA))); + + HostAttributes attributeBuilder = HOST_A_ATTRS.newBuilder().setMode(DRAINED); + attributeBuilder.addToAttributes(new Attribute("newAttr", ImmutableSet.of("a", "b"))); + IHostAttributes hostAUpdated = IHostAttributes.build(attributeBuilder); + insert(hostAUpdated); + assertEquals(Optional.of(hostAUpdated), read(HOST_A)); + } + + private void insert(IHostAttributes attributes) { + storage.write( + storeProvider -> storeProvider.getAttributeStore().saveHostAttributes(attributes)); + } + + private Optional read(String host) { + return storage.read(storeProvider -> storeProvider.getAttributeStore().getHostAttributes(host)); + } + + private Set readAll() { + return storage.read(storeProvider -> storeProvider.getAttributeStore().getHostAttributes()); + } + + private void truncate() { + storage.write( + (NoResult.Quiet) storeProvider -> storeProvider.getAttributeStore().deleteHostAttributes()); + } +}