Return-Path: X-Original-To: apmail-aurora-commits-archive@minotaur.apache.org Delivered-To: apmail-aurora-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 50538106AA for ; Thu, 8 Jan 2015 01:06:01 +0000 (UTC) Received: (qmail 82820 invoked by uid 500); 8 Jan 2015 01:06:02 -0000 Delivered-To: apmail-aurora-commits-archive@aurora.apache.org Received: (qmail 82785 invoked by uid 500); 8 Jan 2015 01:06:02 -0000 Mailing-List: contact commits-help@aurora.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aurora.incubator.apache.org Delivered-To: mailing list commits@aurora.incubator.apache.org Received: (qmail 82775 invoked by uid 99); 8 Jan 2015 01:06:02 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Jan 2015 01:06:02 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 08 Jan 2015 01:05:38 +0000 Received: (qmail 82585 invoked by uid 99); 8 Jan 2015 01:05:34 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Jan 2015 01:05:34 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 90E05A272A7; Thu, 8 Jan 2015 01:05:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: maxim@apache.org To: commits@aurora.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: incubator-aurora git commit: Removing cron schedule support from createJob and killTasks Date: Thu, 8 Jan 2015 01:05:34 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-aurora Updated Branches: refs/heads/master 7449e34d8 -> fb4d3f9ac Removing cron schedule support from createJob and killTasks Bugs closed: AURORA-454, AURORA-976 Reviewed at https://reviews.apache.org/r/29271/ Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/fb4d3f9a Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/fb4d3f9a Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/fb4d3f9a Branch: refs/heads/master Commit: fb4d3f9ac09eda54fd5259ef8b84669fa8cff864 Parents: 7449e34 Author: Maxim Khutornenko Authored: Wed Jan 7 17:01:36 2015 -0800 Committer: -l Committed: Wed Jan 7 17:01:36 2015 -0800 ---------------------------------------------------------------------- .../org/apache/aurora/scheduler/base/Query.java | 17 --- .../thrift/SchedulerThriftInterface.java | 138 ++++++++++--------- .../thrift/SchedulerThriftInterfaceTest.java | 85 +++++------- 3 files changed, 109 insertions(+), 131 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fb4d3f9a/src/main/java/org/apache/aurora/scheduler/base/Query.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/base/Query.java b/src/main/java/org/apache/aurora/scheduler/base/Query.java index a6ff14a..458530f 100644 --- a/src/main/java/org/apache/aurora/scheduler/base/Query.java +++ b/src/main/java/org/apache/aurora/scheduler/base/Query.java @@ -15,12 +15,9 @@ package org.apache.aurora.scheduler.base; import java.util.EnumSet; import java.util.Objects; -import java.util.Set; -import com.google.common.base.Optional; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; import com.google.common.primitives.Ints; import org.apache.aurora.gen.ScheduleStatus; @@ -56,20 +53,6 @@ public final class Query { return q.isSetRole() && q.isSetEnvironment() && q.isSetJobName() || q.isSetJobKeys(); } - /** - * Checks whether a query is strictly scoped to a specific job. A query is strictly job scoped, - * iff the only fields that are set in the query are: role, environment and job name. - * - * @param query Query to test. - * @return {@code true} if the query is strictly single job scoped, otherwise {@code false}. - */ - public static boolean isSingleJobScoped(Builder query) { - Optional> jobKey = JobKeys.from(query); - return jobKey.isPresent() - && jobKey.get().size() == 1 - && Query.jobScoped(Iterables.getOnlyElement(jobKey.get())).equals(query); - } - public static Builder arbitrary(TaskQuery query) { return new Builder(query.deepCopy()); } http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fb4d3f9a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java index 6e3caa1..ac92959 100644 --- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java +++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java @@ -286,6 +286,10 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface { return errorResponse(INVALID_REQUEST, e); } + if (sanitized.isCron()) { + return invalidResponse(NO_CRON); + } + return storage.write(new MutateWork.Quiet() { @Override public Response apply(MutableStoreProvider storeProvider) { @@ -296,84 +300,97 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface { ILockKey.build(LockKey.job(job.getKey().newBuilder())), Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER)); - if (!storeProvider.getTaskStore().fetchTasks( - Query.jobScoped(job.getKey()).active()).isEmpty() - || cronJobManager.hasJob(job.getKey())) { - - return invalidResponse("Job already exists: " + JobKeys.canonicalString(job.getKey())); - } + checkJobExists(storeProvider, job.getKey()); ITaskConfig template = sanitized.getJobConfig().getTaskConfig(); int count = sanitized.getJobConfig().getInstanceCount(); validateTaskLimits(template, count, quotaManager.checkInstanceAddition(template, count)); - // TODO(mchucarroll): deprecate cron as a part of create/kill job.(AURORA-454) - if (sanitized.isCron()) { - LOG.warning("Deprecated behavior: scheduling job " + job.getKey() - + " with cron via createJob (AURORA_454)"); - cronJobManager.createJob(SanitizedCronJob.from(sanitized)); - } else { - LOG.info("Launching " + count + " tasks."); - stateManager.insertPendingTasks( - storeProvider, - template, - sanitized.getInstanceIds()); - } + LOG.info("Launching " + count + " tasks."); + stateManager.insertPendingTasks( + storeProvider, + template, + sanitized.getInstanceIds()); + return okEmptyResponse(); } catch (LockException e) { return errorResponse(LOCK_ERROR, e); - } catch (CronException | TaskValidationException e) { + } catch (JobExistsException | TaskValidationException e) { return errorResponse(INVALID_REQUEST, e); } } }); } + private static class JobExistsException extends Exception { + public JobExistsException(String message) { + super(message); + } + } + + private void checkJobExists(StoreProvider store, IJobKey jobKey) throws JobExistsException { + if (!store.getTaskStore().fetchTasks(Query.jobScoped(jobKey).active()).isEmpty() + || cronJobManager.hasJob(jobKey)) { + + throw new JobExistsException(jobAlreadyExistsMessage(jobKey)); + } + } + private Response createOrUpdateCronTemplate( JobConfiguration mutableJob, - @Nullable Lock mutableLock, + @Nullable final Lock mutableLock, SessionKey session, - boolean updateOnly) { + final boolean updateOnly) { IJobConfiguration job = IJobConfiguration.build(mutableJob); - IJobKey jobKey = JobKeys.assertValid(job.getKey()); + final IJobKey jobKey = JobKeys.assertValid(job.getKey()); requireNonNull(session); + final SanitizedConfiguration sanitized; try { sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole())); + sanitized = SanitizedConfiguration.fromUnsanitized(job); + } catch (AuthFailedException e) { + return errorResponse(AUTH_FAILED, e); + } catch (TaskDescriptionException e) { + return errorResponse(INVALID_REQUEST, e); + } - SanitizedConfiguration sanitized = SanitizedConfiguration.fromUnsanitized(job); + if (!sanitized.isCron()) { + return invalidResponse(noCronScheduleMessage(jobKey)); + } - lockManager.validateIfLocked( - ILockKey.build(LockKey.job(jobKey.newBuilder())), - Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER)); + return storage.write(new MutateWork.Quiet() { + @Override + public Response apply(MutableStoreProvider storeProvider) { + try { + lockManager.validateIfLocked( + ILockKey.build(LockKey.job(jobKey.newBuilder())), + Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER)); - if (!sanitized.isCron()) { - return invalidResponse(noCronScheduleMessage(jobKey)); - } + ITaskConfig template = sanitized.getJobConfig().getTaskConfig(); + int count = sanitized.getJobConfig().getInstanceCount(); - ITaskConfig template = sanitized.getJobConfig().getTaskConfig(); - int count = sanitized.getJobConfig().getInstanceCount(); + validateTaskLimits(template, count, quotaManager.checkInstanceAddition(template, count)); - validateTaskLimits(template, count, quotaManager.checkInstanceAddition(template, count)); + // TODO(mchucarroll): Merge CronJobManager.createJob/updateJob + if (updateOnly || cronJobManager.hasJob(sanitized.getJobConfig().getKey())) { + // The job already has a schedule: so update it. + cronJobManager.updateJob(SanitizedCronJob.from(sanitized)); + } else { + checkJobExists(storeProvider, jobKey); + cronJobManager.createJob(SanitizedCronJob.from(sanitized)); + } - // TODO(mchucarroll): Merge CronJobManager.createJob/updateJob - if (updateOnly || cronJobManager.hasJob(sanitized.getJobConfig().getKey())) { - // The job already has a schedule: so update it. - cronJobManager.updateJob(SanitizedCronJob.from(sanitized)); - } else { - cronJobManager.createJob(SanitizedCronJob.from(sanitized)); + return okEmptyResponse(); + } catch (LockException e) { + return errorResponse(LOCK_ERROR, e); + } catch (JobExistsException | TaskValidationException | CronException e) { + return errorResponse(INVALID_REQUEST, e); + } } - - return okEmptyResponse(); - } catch (AuthFailedException e) { - return errorResponse(AUTH_FAILED, e); - } catch (LockException e) { - return errorResponse(LOCK_ERROR, e); - } catch (TaskDescriptionException | TaskValidationException | CronException e) { - return errorResponse(INVALID_REQUEST, e); - } + }); } @Override @@ -736,9 +753,6 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface { public Response apply(MutableStoreProvider storeProvider) { Query.Builder query = Query.arbitrary(mutableQuery); - // Check single job scoping before adding statuses. - boolean isSingleJobScoped = Query.isSingleJobScoped(query); - // Unless statuses were specifically supplied, only attempt to kill active tasks. query = query.get().isSetStatuses() ? query : query.byStatus(ACTIVE_STATES); @@ -767,19 +781,6 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface { LOG.info("Killing tasks matching " + query); - final boolean cronJobKilled; - if (isSingleJobScoped) { - // If this looks like a query for all tasks in a job, instruct the cron - // scheduler to delete it. - // TODO(mchucarroll): deprecate cron as a part of create/kill job. (AURORA-454) - IJobKey jobKey = Iterables.getOnlyElement(JobKeys.from(query).get()); - LOG.warning("Deprecated behavior: descheduling job " + jobKey - + " with cron via killTasks. (See AURORA-454)"); - cronJobKilled = cronJobManager.deleteJob(jobKey); - } else { - cronJobKilled = false; - } - boolean tasksKilled = false; for (String taskId : Tasks.ids(tasks)) { tasksKilled |= stateManager.changeState( @@ -790,7 +791,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface { killedByMessage(context.getIdentity())); } - return cronJobKilled || tasksKilled + return tasksKilled ? okEmptyResponse() : addMessage(emptyResponse(), OK, NO_TASKS_TO_KILL_MESSAGE); } @@ -1373,7 +1374,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface { ITaskConfig.build(mutableRequest.getTaskConfig())).newBuilder())); if (cronJobManager.hasJob(job)) { - return invalidResponse(NO_CRON_UPDATES); + return invalidResponse(NO_CRON); } } catch (AuthFailedException e) { return errorResponse(AUTH_FAILED, e); @@ -1562,13 +1563,18 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface { } @VisibleForTesting + static String jobAlreadyExistsMessage(IJobKey jobKey) { + return String.format("Job %s already exists", JobKeys.canonicalString(jobKey)); + } + + @VisibleForTesting static final String NO_TASKS_TO_KILL_MESSAGE = "No tasks to kill."; @VisibleForTesting static final String NOOP_JOB_UPDATE_MESSAGE = "Job is unchanged by proposed update."; @VisibleForTesting - static final String NO_CRON_UPDATES = "Cron jobs may only be updated by calling scheduleCronJob."; + static final String NO_CRON = "Cron jobs may only be created/updated by calling scheduleCronJob."; private static Response okEmptyResponse() { return emptyResponse().setResponseCode(OK); http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fb4d3f9a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java index 8d41e70..ad9126c 100644 --- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java +++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java @@ -169,7 +169,8 @@ import static org.apache.aurora.scheduler.storage.backup.Recovery.RecoveryExcept import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.MAX_TASKS_PER_JOB; import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.MAX_TASK_ID_LENGTH; import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.NOOP_JOB_UPDATE_MESSAGE; -import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.NO_CRON_UPDATES; +import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.NO_CRON; +import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.jobAlreadyExistsMessage; import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.killedByMessage; import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.noCronScheduleMessage; import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.notScheduledCronMessage; @@ -387,42 +388,14 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest { } @Test - public void testCreateCronJob() throws Exception { - IJobConfiguration job = IJobConfiguration.build(makeProdJob().setCronSchedule(CRON_SCHEDULE)); - SanitizedConfiguration sanitized = SanitizedConfiguration.fromUnsanitized(job); - expectAuth(ROLE, true); - lockManager.validateIfLocked(LOCK_KEY, Optional.of(LOCK)); - storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active()); - expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false); - expect(taskIdGenerator.generate(sanitized.getJobConfig().getTaskConfig(), 1)) - .andReturn(TASK_ID); - expect(quotaManager.checkInstanceAddition(sanitized.getJobConfig().getTaskConfig(), 1)) - .andReturn(ENOUGH_QUOTA); - - cronJobManager.createJob(anyObject(SanitizedCronJob.class)); - - control.replay(); - - assertOkResponse(thrift.createJob(job.newBuilder(), LOCK.newBuilder(), SESSION)); - } - - @Test - public void testRejectCronJobEmptyCronSchedule() throws Exception { + public void testCreateJobFailsForCron() throws Exception { IJobConfiguration job = IJobConfiguration.build(makeProdJob().setCronSchedule("")); - SanitizedConfiguration sanitized = SanitizedConfiguration.fromUnsanitized(job); expectAuth(ROLE, true); - lockManager.validateIfLocked(LOCK_KEY, Optional.of(LOCK)); - storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active()); - expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false); - expect(taskIdGenerator.generate(sanitized.getJobConfig().getTaskConfig(), 1)) - .andReturn(TASK_ID); - expect(quotaManager.checkInstanceAddition(sanitized.getJobConfig().getTaskConfig(), 1)) - .andReturn(ENOUGH_QUOTA); control.replay(); assertEquals( - invalidResponse(SanitizedCronJob.NO_CRON_SCHEDULE), + invalidResponse(NO_CRON), thrift.createJob(job.newBuilder(), LOCK.newBuilder(), SESSION)); } @@ -799,7 +772,6 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest { Query.Builder query = Query.unscoped().byJob(JOB_KEY); expectAuth(ROOT, true); storageUtil.expectTaskFetch(query.active(), buildScheduledTask()); - expect(cronJobManager.deleteJob(JOB_KEY)).andReturn(false); lockManager.validateIfLocked(LOCK_KEY, Optional.absent()); expectTransitionsToKilling(); @@ -809,18 +781,6 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest { } @Test - public void testKillCronJob() throws Exception { - Query.Builder query = Query.jobScoped(JOB_KEY); - expectAuth(ROOT, true); - storageUtil.expectTaskFetch(query.active()); - expect(cronJobManager.deleteJob(JOB_KEY)).andReturn(true); - - control.replay(); - - assertOkResponse(thrift.killTasks(query.get(), DEFAULT_LOCK, SESSION)); - } - - @Test public void testKillTasksLockCheckFailed() throws Exception { Query.Builder query = Query.unscoped().byJob(JOB_KEY).active(); IScheduledTask task2 = buildScheduledTask("job_bar", TASK_ID); @@ -900,7 +860,6 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest { storageUtil.expectTaskFetch(query.active()); expectAuth(ImmutableSet.of("role"), true); - expect(cronJobManager.deleteJob(key)).andReturn(true); control.replay(); assertOkResponse(thrift.killTasks(query.get(), DEFAULT_LOCK, SESSION)); @@ -1273,13 +1232,34 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest { expect(quotaManager.checkInstanceAddition(sanitized.getJobConfig().getTaskConfig(), 1)) .andReturn(ENOUGH_QUOTA); - expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false); + expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false).times(2); + storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active()); cronJobManager.createJob(SanitizedCronJob.from(sanitized)); control.replay(); assertResponse(OK, thrift.scheduleCronJob(CRON_JOB, DEFAULT_LOCK, SESSION)); } @Test + public void testScheduleCronFailsCreationDueToExistingNonCron() throws Exception { + expectAuth(ROLE, true); + lockManager.validateIfLocked(LOCK_KEY, Optional.absent()); + SanitizedConfiguration sanitized = + SanitizedConfiguration.fromUnsanitized(IJobConfiguration.build(CRON_JOB)); + + expect(taskIdGenerator.generate(sanitized.getJobConfig().getTaskConfig(), 1)) + .andReturn(TASK_ID); + expect(quotaManager.checkInstanceAddition(sanitized.getJobConfig().getTaskConfig(), 1)) + .andReturn(ENOUGH_QUOTA); + + expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false); + storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active(), buildScheduledTask()); + control.replay(); + assertEquals( + invalidResponse(jobAlreadyExistsMessage(JOB_KEY)), + thrift.scheduleCronJob(CRON_JOB, DEFAULT_LOCK, SESSION)); + } + + @Test public void testScheduleCronUpdatesJob() throws Exception { expectAuth(ROLE, true); lockManager.validateIfLocked(LOCK_KEY, Optional.absent()); @@ -1309,6 +1289,16 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest { } @Test + public void testScheduleCronJobFailedTaskConfigValidation() throws Exception { + expectAuth(ROLE, true); + control.replay(); + IJobConfiguration job = IJobConfiguration.build(makeJob(null)); + assertResponse( + INVALID_REQUEST, + thrift.scheduleCronJob(job.newBuilder(), DEFAULT_LOCK, SESSION)); + } + + @Test public void testScheduleCronJobFailsLockValidation() throws Exception { expectAuth(ROLE, true); lockManager.validateIfLocked(LOCK_KEY, Optional.of(LOCK)); @@ -1320,7 +1310,6 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest { @Test public void testScheduleCronJobFailsWithNoCronSchedule() throws Exception { expectAuth(ROLE, true); - lockManager.validateIfLocked(LOCK_KEY, Optional.absent()); control.replay(); assertEquals( @@ -2696,7 +2685,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest { expect(cronJobManager.hasJob(JOB_KEY)).andReturn(true); control.replay(); - assertEquals(invalidResponse(NO_CRON_UPDATES), thrift.startJobUpdate(request, SESSION)); + assertEquals(invalidResponse(NO_CRON), thrift.startJobUpdate(request, SESSION)); } @Test