aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kevi...@apache.org
Subject [2/5] Revert "AURORA-132: Cron system based on Quartz"
Date Thu, 24 Apr 2014 01:18:18 GMT
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5de2b1ab/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
deleted file mode 100644
index e7d1c14..0000000
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Copyright 2014 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.cron.quartz;
-
-import java.util.Map;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-import com.twitter.common.base.Supplier;
-import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.BackoffHelper;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.CronCollisionPolicy;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.scheduler.cron.CronJobManager;
-import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
-import org.junit.Before;
-import org.junit.Test;
-import org.quartz.JobExecutionException;
-
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class AuroraCronJobTest extends EasyMockTest {
-  public static final String TASK_ID = "A";
-  private Storage storage;
-  private StateManager stateManager;
-  private CronJobManager cronJobManager;
-  private BackoffHelper backoffHelper;
-
-  private AuroraCronJob auroraCronJob;
-
-  private static final String MANAGER_ID = "MANAGER_ID";
-
-  @Before
-  public void setUp() {
-    storage = MemStorage.newEmptyStorage();
-    stateManager = createMock(StateManager.class);
-    cronJobManager = createMock(CronJobManager.class);
-    backoffHelper = createMock(BackoffHelper.class);
-
-    auroraCronJob = new AuroraCronJob(
-        new AuroraCronJob.Config(backoffHelper), storage, stateManager, cronJobManager);
-    expect(cronJobManager.getManagerKey()).andStubReturn(MANAGER_ID);
-  }
-
-  @Test
-  public void testExecuteNonexistentIsNoop() throws JobExecutionException {
-    control.replay();
-
-    auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
-  }
-
-  @Test
-  public void testInvalidConfigIsNoop() throws JobExecutionException {
-    control.replay();
-    storage.write(new Storage.MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(Storage.MutableStoreProvider storeProvider) {
-        storeProvider.getJobStore().saveAcceptedJob(
-            MANAGER_ID,
-            IJobConfiguration.build(QuartzTestUtil.JOB.newBuilder().setCronSchedule(null)));
-      }
-    });
-
-    auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
-  }
-
-  @Test
-  public void testEmptyStorage() throws JobExecutionException {
-    stateManager.insertPendingTasks(EasyMock.<Map<Integer, ITaskConfig>>anyObject());
-    expectLastCall().times(3);
-
-    control.replay();
-    populateStorage(CronCollisionPolicy.CANCEL_NEW);
-    auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
-    storage = MemStorage.newEmptyStorage();
-
-    populateStorage(CronCollisionPolicy.KILL_EXISTING);
-    auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
-    storage = MemStorage.newEmptyStorage();
-
-    populateStorage(CronCollisionPolicy.RUN_OVERLAP);
-    auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
-  }
-
-  @Test
-  public void testCancelNew() throws JobExecutionException {
-    control.replay();
-
-    populateTaskStore();
-    populateStorage(CronCollisionPolicy.CANCEL_NEW);
-    auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
-  }
-
-  @Test
-  public void testKillExisting() throws Exception {
-    Capture<Supplier<Boolean>> capture = createCapture();
-
-    expect(stateManager.changeState(
-        TASK_ID,
-        Optional.<ScheduleStatus>absent(),
-        ScheduleStatus.KILLING,
-        AuroraCronJob.KILL_AUDIT_MESSAGE))
-        .andReturn(true);
-    backoffHelper.doUntilSuccess(EasyMock.capture(capture));
-    stateManager.insertPendingTasks(EasyMock.<Map<Integer, ITaskConfig>>anyObject());
-
-    control.replay();
-
-    populateStorage(CronCollisionPolicy.KILL_EXISTING);
-    populateTaskStore();
-    auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
-    assertFalse(capture.getValue().get());
-    storage.write(
-        new Storage.MutateWork.NoResult.Quiet() {
-          @Override
-          protected void execute(Storage.MutableStoreProvider storeProvider) {
-            storeProvider.getUnsafeTaskStore().deleteAllTasks();
-          }
-        });
-    assertTrue(capture.getValue().get());
-  }
-
-  private void populateTaskStore() {
-    storage.write(new Storage.MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(Storage.MutableStoreProvider storeProvider) {
-        storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(
-            IScheduledTask.build(new ScheduledTask()
-                .setStatus(ScheduleStatus.RUNNING)
-                .setAssignedTask(new AssignedTask()
-                    .setTaskId(TASK_ID)
-                    .setTask(QuartzTestUtil.JOB.getTaskConfig().newBuilder())))
-        ));
-      }
-    });
-  }
-
-  private void populateStorage(final CronCollisionPolicy policy) {
-    storage.write(new Storage.MutateWork.NoResult.Quiet() {
-      @Override
-      public void execute(Storage.MutableStoreProvider storeProvider) {
-        storeProvider.getJobStore().saveAcceptedJob(
-            MANAGER_ID,
-            QuartzTestUtil.makeSanitizedCronJob(policy).getSanitizedConfig().getJobConfig());
-      }
-    });
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5de2b1ab/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
deleted file mode 100644
index 21e8278..0000000
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/**
- * Copyright 2014 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.cron.quartz;
-
-import java.util.concurrent.CountDownLatch;
-
-import com.google.common.util.concurrent.Service;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-
-import com.google.inject.util.Modules;
-import com.twitter.common.application.ShutdownRegistry;
-import com.twitter.common.base.ExceptionalCommand;
-import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.Clock;
-
-import org.apache.aurora.gen.ExecutorConfig;
-import org.apache.aurora.gen.Identity;
-import org.apache.aurora.gen.JobConfiguration;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.cron.CronJobManager;
-import org.apache.aurora.scheduler.cron.CrontabEntry;
-import org.apache.aurora.scheduler.cron.SanitizedCronJob;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.state.PubsubTestUtil;
-import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
-import org.easymock.Capture;
-import org.easymock.IAnswer;
-import org.junit.Before;
-import org.junit.Test;
-import org.quartz.JobExecutionContext;
-import org.quartz.Scheduler;
-import org.quartz.Trigger;
-import org.quartz.TriggerListener;
-
-import static org.easymock.EasyMock.capture;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.isA;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class CronIT extends EasyMockTest {
-  public static final CrontabEntry CRONTAB_ENTRY = CrontabEntry.parse("* * * * *");
-
-  private static final IJobKey JOB_KEY = JobKeys.from("roll", "b", "c");
-  private static final Identity IDENTITY = new Identity()
-      .setRole(JOB_KEY.getRole())
-      .setUser("user");
-
-  private static final IJobConfiguration CRON_JOB = IJobConfiguration.build(
-      new JobConfiguration()
-          .setCronSchedule(CRONTAB_ENTRY.toString())
-          .setKey(JOB_KEY.newBuilder())
-          .setInstanceCount(2)
-          .setOwner(IDENTITY)
-          .setTaskConfig(new TaskConfig()
-              .setJobName(JOB_KEY.getName())
-              .setEnvironment(JOB_KEY.getEnvironment())
-              .setOwner(IDENTITY)
-              .setExecutorConfig(new ExecutorConfig()
-                  .setName("cmd.exe")
-                  .setData("echo hello world"))
-              .setNumCpus(7)
-              .setRamMb(8)
-              .setDiskMb(9))
-  );
-
-  private ShutdownRegistry shutdownRegistry;
-  private EventSink eventSink;
-  private Injector injector;
-  private StateManager stateManager;
-  private Storage storage;
-  private AuroraCronJob auroraCronJob;
-
-  private Capture<ExceptionalCommand<?>> shutdown;
-
-  @Before
-  public void setUp() throws Exception {
-    shutdownRegistry = createMock(ShutdownRegistry.class);
-    stateManager = createMock(StateManager.class);
-    storage = MemStorage.newEmptyStorage();
-    auroraCronJob = createMock(AuroraCronJob.class);
-
-    injector = Guice.createInjector(
-        // Override to verify that Guice is actually used for construction of the AuroraCronJob.
-        // TODO(ksweeney): Use the production class here.
-        Modules.override(new CronModule()).with(new AbstractModule() {
-          @Override
-          protected void configure() {
-            bind(AuroraCronJob.class).toInstance(auroraCronJob);
-          }
-        }), new AbstractModule() {
-          @Override
-          protected void configure() {
-            bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK);
-            bind(ShutdownRegistry.class).toInstance(shutdownRegistry);
-            bind(StateManager.class).toInstance(stateManager);
-            bind(Storage.class).toInstance(storage);
-
-            PubsubTestUtil.installPubsub(binder());
-          }
-        });
-    eventSink = PubsubTestUtil.startPubsub(injector);
-
-    shutdown = createCapture();
-    shutdownRegistry.addAction(capture(shutdown));
-  }
-
-  private void boot() {
-    eventSink.post(new PubsubEvent.SchedulerActive());
-  }
-
-  @Test
-  public void testCronSchedulerLifecycle() throws Exception {
-    control.replay();
-
-    Scheduler scheduler = injector.getInstance(Scheduler.class);
-    assertTrue(!scheduler.isStarted());
-
-    boot();
-    Service cronLifecycle = injector.getInstance(CronLifecycle.class);
-
-    assertTrue(cronLifecycle.isRunning());
-    assertTrue(scheduler.isStarted());
-
-    shutdown.getValue().execute();
-
-    assertTrue(!cronLifecycle.isRunning());
-    assertTrue(scheduler.isShutdown());
-  }
-
-  @Test
-  public void testJobsAreScheduled() throws Exception {
-    auroraCronJob.execute(isA(JobExecutionContext.class));
-
-    control.replay();
-    final CronJobManager cronJobManager = injector.getInstance(CronJobManager.class);
-    final Scheduler scheduler = injector.getInstance(Scheduler.class);
-
-    storage.write(new Storage.MutateWork.NoResult.Quiet() {
-      @Override
-      public void execute(Storage.MutableStoreProvider storeProvider) {
-        storeProvider.getJobStore().saveAcceptedJob(
-            cronJobManager.getManagerKey(),
-            CRON_JOB);
-      }
-    });
-
-    final CountDownLatch cronRan = new CountDownLatch(1);
-    scheduler.getListenerManager().addTriggerListener(new CountDownWhenComplete(cronRan));
-    boot();
-
-    cronRan.await();
-
-    shutdown.getValue().execute();
-  }
-
-  @Test
-  public void testKillExistingDogpiles() throws Exception {
-    // Test that a trigger for a job that hasn't finished running is ignored.
-    final CronJobManager cronJobManager = injector.getInstance(CronJobManager.class);
-
-    final CountDownLatch firstExecutionTriggered = new CountDownLatch(1);
-    final CountDownLatch firstExecutionCompleted = new CountDownLatch(1);
-    final CountDownLatch secondExecutionTriggered = new CountDownLatch(1);
-    final CountDownLatch secondExecutionCompleted = new CountDownLatch(1);
-
-    auroraCronJob.execute(isA(JobExecutionContext.class));
-    expectLastCall().andAnswer(new IAnswer<Void>() {
-      @Override
-      public Void answer() throws Throwable {
-        firstExecutionTriggered.countDown();
-        firstExecutionCompleted.await();
-        return null;
-      }
-    });
-    auroraCronJob.execute(isA(JobExecutionContext.class));
-    expectLastCall().andAnswer(new IAnswer<Object>() {
-      @Override
-      public Void answer() throws Throwable {
-        secondExecutionTriggered.countDown();
-        secondExecutionCompleted.await();
-        return null;
-      }
-    });
-
-    control.replay();
-
-    boot();
-
-    cronJobManager.createJob(SanitizedCronJob.fromUnsanitized(CRON_JOB));
-    cronJobManager.startJobNow(JOB_KEY);
-    firstExecutionTriggered.await();
-    cronJobManager.startJobNow(JOB_KEY);
-    assertEquals(1, secondExecutionTriggered.getCount());
-    firstExecutionCompleted.countDown();
-    secondExecutionTriggered.await();
-    secondExecutionTriggered.countDown();
-  }
-
-  private static class CountDownWhenComplete implements TriggerListener {
-    private final CountDownLatch countDownLatch;
-
-    CountDownWhenComplete(CountDownLatch countDownLatch) {
-      this.countDownLatch = countDownLatch;
-    }
-
-    @Override
-    public String getName() {
-      return CountDownWhenComplete.class.getName();
-    }
-
-    @Override
-    public void triggerFired(Trigger trigger, JobExecutionContext context) {
-    }
-
-    @Override
-    public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context) {
-      return false;
-    }
-
-    @Override
-    public void triggerMisfired(Trigger trigger) {
-      // No-op.
-    }
-
-    @Override
-    public void triggerComplete(
-        Trigger trigger,
-        JobExecutionContext context,
-        Trigger.CompletedExecutionInstruction triggerInstructionCode) {
-
-      countDownLatch.countDown();
-      // No-op.
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5de2b1ab/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
deleted file mode 100644
index b9116a4..0000000
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Copyright 2014 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.cron.quartz;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.TimeZone;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.twitter.common.testing.easymock.EasyMockTest;
-
-import org.apache.aurora.gen.CronCollisionPolicy;
-import org.apache.aurora.scheduler.cron.CronException;
-import org.apache.aurora.scheduler.cron.CronJobManager;
-import org.apache.aurora.scheduler.cron.CrontabEntry;
-import org.apache.aurora.scheduler.cron.SanitizedCronJob;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
-import org.easymock.EasyMock;
-import org.junit.Before;
-import org.junit.Test;
-import org.quartz.JobDetail;
-import org.quartz.JobKey;
-import org.quartz.Scheduler;
-import org.quartz.Trigger;
-import org.quartz.impl.matchers.GroupMatcher;
-
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class CronJobManagerImplTest extends EasyMockTest {
-  private Storage storage;
-  private Scheduler scheduler;
-
-  private CronJobManager cronJobManager;
-
-  @Before
-  public void setUp() {
-    storage = MemStorage.newEmptyStorage();
-    scheduler = createMock(Scheduler.class);
-
-    cronJobManager = new CronJobManagerImpl(storage, scheduler, TimeZone.getTimeZone("GMT"));
-  }
-
-  @Test
-  public void testStartJobNowExistent() throws Exception {
-    populateStorage();
-    scheduler.triggerJob(QuartzTestUtil.QUARTZ_JOB_KEY);
-
-    control.replay();
-
-    cronJobManager.startJobNow(QuartzTestUtil.AURORA_JOB_KEY);
-  }
-
-  @Test(expected = CronException.class)
-  public void testStartJobNowNonexistent() throws Exception {
-    control.replay();
-
-    cronJobManager.startJobNow(QuartzTestUtil.AURORA_JOB_KEY);
-  }
-
-  @Test
-  public void testUpdateExistingJob() throws Exception {
-    SanitizedCronJob sanitizedCronJob = QuartzTestUtil.makeSanitizedCronJob();
-
-    expect(scheduler.deleteJob(QuartzTestUtil.QUARTZ_JOB_KEY)).andReturn(true);
-    expect(scheduler.scheduleJob(anyObject(JobDetail.class), anyObject(Trigger.class)))
-       .andReturn(null);
-
-    populateStorage();
-
-    control.replay();
-
-    cronJobManager.updateJob(sanitizedCronJob);
-    assertEquals(sanitizedCronJob.getSanitizedConfig().getJobConfig(), fetchFromStorage().orNull());
-  }
-
-  @Test
-  public void testUpdateNonexistentJob() throws Exception {
-    control.replay();
-
-    try {
-      cronJobManager.updateJob(QuartzTestUtil.makeUpdatedJob());
-      fail();
-    } catch (CronException e) {
-      // Expected.
-    }
-
-    assertEquals(Optional.<IJobConfiguration>absent(), fetchFromStorage());
-  }
-
-  @Test
-  public void testCreateNonexistentJob() throws Exception {
-    SanitizedCronJob sanitizedCronJob = QuartzTestUtil.makeSanitizedCronJob();
-
-    expect(scheduler.scheduleJob(anyObject(JobDetail.class), anyObject(Trigger.class)))
-        .andReturn(null);
-
-    control.replay();
-
-    cronJobManager.createJob(sanitizedCronJob);
-
-    assertEquals(
-        sanitizedCronJob.getSanitizedConfig().getJobConfig(),
-        fetchFromStorage().orNull());
-  }
-
-  @Test(expected = CronException.class)
-  public void testCreateExistingJobFails() throws Exception {
-    SanitizedCronJob sanitizedCronJob = QuartzTestUtil.makeSanitizedCronJob();
-    populateStorage();
-    control.replay();
-
-    cronJobManager.createJob(sanitizedCronJob);
-  }
-
-  @Test
-  public void testGetJobs() throws Exception {
-    control.replay();
-    assertEquals(Collections.emptyList(), ImmutableList.copyOf(cronJobManager.getJobs()));
-
-    populateStorage();
-    assertEquals(
-        QuartzTestUtil.makeSanitizedCronJob().getSanitizedConfig().getJobConfig(),
-        Iterables.getOnlyElement(cronJobManager.getJobs()));
-  }
-
-  @Test
-  public void testNoRunOverlap() throws Exception {
-    SanitizedCronJob runOverlapJob = SanitizedCronJob.fromUnsanitized(
-        IJobConfiguration.build(QuartzTestUtil.JOB.newBuilder()
-            .setCronCollisionPolicy(CronCollisionPolicy.RUN_OVERLAP)));
-
-    control.replay();
-
-    try {
-      cronJobManager.createJob(runOverlapJob);
-      fail();
-    } catch (CronException e) {
-      // Expected.
-    }
-
-    try {
-      cronJobManager.updateJob(runOverlapJob);
-    } catch (CronException e) {
-      // Expected.
-    }
-
-    assertEquals(Optional.<IJobConfiguration>absent(), fetchFromStorage());
-  }
-
-  @Test
-  public void testDeleteJob() throws Exception {
-    expect(scheduler.deleteJob(QuartzTestUtil.QUARTZ_JOB_KEY)).andReturn(true);
-
-    control.replay();
-
-    assertFalse(cronJobManager.deleteJob(QuartzTestUtil.AURORA_JOB_KEY));
-    populateStorage();
-    assertTrue(cronJobManager.deleteJob(QuartzTestUtil.AURORA_JOB_KEY));
-    assertEquals(Optional.<IJobConfiguration>absent(), fetchFromStorage());
-  }
-
-  @Test
-  public void testGetScheduledJobs() throws Exception {
-    JobDetail jobDetail = createMock(JobDetail.class);
-    expect(scheduler.getJobKeys(EasyMock.<GroupMatcher<JobKey>>anyObject()))
-        .andReturn(ImmutableSet.of(QuartzTestUtil.QUARTZ_JOB_KEY));
-    expect(scheduler.getJobDetail(QuartzTestUtil.QUARTZ_JOB_KEY))
-        .andReturn(jobDetail);
-    expect(jobDetail.getDescription()).andReturn("* * * * *");
-
-    control.replay();
-
-    Map<IJobKey, CrontabEntry> scheduledJobs = cronJobManager.getScheduledJobs();
-    assertEquals(CrontabEntry.parse("* * * * *"), scheduledJobs.get(QuartzTestUtil.AURORA_JOB_KEY));
-  }
-
-  private void populateStorage() throws Exception {
-    storage.write(new Storage.MutateWork.NoResult<Exception>() {
-      @Override
-      protected void execute(Storage.MutableStoreProvider storeProvider) throws Exception {
-        storeProvider.getJobStore().saveAcceptedJob(
-            cronJobManager.getManagerKey(),
-            QuartzTestUtil.makeSanitizedCronJob().getSanitizedConfig().getJobConfig());
-      }
-    });
-  }
-
-  private Optional<IJobConfiguration> fetchFromStorage() {
-    return storage.consistentRead(new Storage.Work.Quiet<Optional<IJobConfiguration>>() {
-      @Override
-      public Optional<IJobConfiguration> apply(Storage.StoreProvider storeProvider) {
-        return storeProvider.getJobStore().fetchJob(cronJobManager.getManagerKey(),
-            QuartzTestUtil.AURORA_JOB_KEY);
-      }
-    });
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5de2b1ab/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronPredictorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronPredictorImplTest.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronPredictorImplTest.java
deleted file mode 100644
index 6bc97f3..0000000
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronPredictorImplTest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Copyright 2014 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.cron.quartz;
-
-import java.util.Date;
-import java.util.List;
-import java.util.TimeZone;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.util.testing.FakeClock;
-
-import org.apache.aurora.scheduler.cron.CronPredictor;
-import org.apache.aurora.scheduler.cron.CrontabEntry;
-
-import org.apache.aurora.scheduler.cron.ExpectedPrediction;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class CronPredictorImplTest {
-  private static final TimeZone TIME_ZONE = TimeZone.getTimeZone("GMT");
-  public static final CrontabEntry CRONTAB_ENTRY = CrontabEntry.parse("* * * * *");
-
-  private CronPredictor cronPredictor;
-
-  private FakeClock clock;
-
-  @Before
-  public void setUp() {
-    clock = new FakeClock();
-    cronPredictor = new CronPredictorImpl(clock, TIME_ZONE);
-  }
-
-  @Test
-  public void testValidSchedule() {
-    clock.advance(Amount.of(1L, Time.DAYS));
-    Date expectedPrediction = new Date(Amount.of(1L, Time.DAYS).as(Time.MILLISECONDS)
-            + Amount.of(1L, Time.MINUTES).as(Time.MILLISECONDS));
-    assertEquals(expectedPrediction, cronPredictor.predictNextRun(CrontabEntry.parse("* * * * *")));
-  }
-
-  @Test
-  public void testCronExpressions() {
-    assertEquals("0 * * ? * 1,2,3,4,5,6,7",
-        Quartz.cronExpression(CRONTAB_ENTRY, TIME_ZONE).getCronExpression());
-  }
-
-  @Test
-  public void testCronPredictorConforms() throws Exception {
-    for (ExpectedPrediction expectedPrediction : ExpectedPrediction.getAll()) {
-      List<Date> results = Lists.newArrayList();
-      clock.setNowMillis(0);
-      for (int i = 0; i < expectedPrediction.getTriggerTimes().size(); i++) {
-        Date nextTriggerTime = cronPredictor.predictNextRun(expectedPrediction.parseCrontabEntry());
-        results.add(nextTriggerTime);
-        clock.setNowMillis(nextTriggerTime.getTime());
-      }
-      assertEquals(
-          "Cron schedule " + expectedPrediction.getSchedule() + " made unexpected predictions.",
-          Lists.transform(
-              expectedPrediction.getTriggerTimes(),
-              new Function<Long, Date>() {
-                @Override
-                public Date apply(Long time) {
-                  return new Date(time);
-                }
-              }
-          ),
-          results);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5de2b1ab/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java
deleted file mode 100644
index b10f514..0000000
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Copyright 2014 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.cron.quartz;
-
-import com.google.common.base.Throwables;
-
-import org.apache.aurora.gen.CronCollisionPolicy;
-import org.apache.aurora.gen.ExecutorConfig;
-import org.apache.aurora.gen.Identity;
-import org.apache.aurora.gen.JobConfiguration;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.configuration.ConfigurationManager;
-import org.apache.aurora.scheduler.cron.CronException;
-import org.apache.aurora.scheduler.cron.SanitizedCronJob;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.quartz.JobKey;
-
-/**
- * Fixtures used across quartz tests.
- */
-final class QuartzTestUtil {
-  static final IJobKey AURORA_JOB_KEY = JobKeys.from("role", "env", "job");
-  static final IJobConfiguration JOB = IJobConfiguration.build(
-      new JobConfiguration()
-          .setCronSchedule("* * * * SUN")
-          .setInstanceCount(10)
-          .setOwner(new Identity("role", "user"))
-          .setKey(AURORA_JOB_KEY.newBuilder())
-          .setTaskConfig(new TaskConfig()
-              .setOwner(new Identity("role", "user"))
-              .setJobName(AURORA_JOB_KEY.getName())
-              .setEnvironment(AURORA_JOB_KEY.getEnvironment())
-              .setDiskMb(3)
-              .setRamMb(4)
-              .setNumCpus(5)
-              .setExecutorConfig(new ExecutorConfig()
-                  .setName("cmd.exe")
-                  .setData("echo hello world")))
-  );
-  static final JobKey QUARTZ_JOB_KEY = Quartz.jobKey(AURORA_JOB_KEY);
-
-  private QuartzTestUtil() {
-    // Utility class.
-  }
-
-  static SanitizedCronJob makeSanitizedCronJob(CronCollisionPolicy collisionPolicy) {
-    try {
-      return SanitizedCronJob.fromUnsanitized(
-          IJobConfiguration.build(JOB.newBuilder().setCronCollisionPolicy(collisionPolicy)));
-    } catch (CronException | ConfigurationManager.TaskDescriptionException e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  static SanitizedCronJob makeSanitizedCronJob() {
-    return makeSanitizedCronJob(CronCollisionPolicy.KILL_EXISTING);
-  }
-
-  static SanitizedCronJob makeUpdatedJob() throws Exception {
-    return SanitizedCronJob.fromUnsanitized(
-        IJobConfiguration.build(JOB.newBuilder().setCronSchedule("* * 1 * *")));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5de2b1ab/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
index 2857f35..d7dbfaa 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
@@ -38,6 +38,8 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.twitter.common.application.ShutdownRegistry;
 import com.twitter.common.collections.Pair;
 import com.twitter.common.testing.easymock.EasyMockTest;
 import com.twitter.common.util.testing.FakeClock;
@@ -66,10 +68,7 @@ import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
 import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
-import org.apache.aurora.scheduler.cron.CronException;
-import org.apache.aurora.scheduler.cron.CronJobManager;
-import org.apache.aurora.scheduler.cron.CrontabEntry;
-import org.apache.aurora.scheduler.cron.SanitizedCronJob;
+import org.apache.aurora.scheduler.cron.CronScheduler;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.quota.QuotaCheckResult;
@@ -86,7 +85,6 @@ import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.storage.entities.ITaskEvent;
 import org.apache.mesos.Protos.SlaveID;
 import org.easymock.EasyMock;
-import org.easymock.IArgumentMatcher;
 import org.easymock.IExpectationSetters;
 import org.junit.Before;
 import org.junit.Test;
@@ -108,10 +106,9 @@ import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.INSUFFIC
 import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.SUFFICIENT_QUOTA;
 import static org.easymock.EasyMock.anyInt;
 import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.isA;
-import static org.easymock.EasyMock.reportMatcher;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -131,23 +128,25 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   private static final IJobKey KEY_A = JobKeys.from(ROLE_A, ENV_A, JOB_A);
   private static final int ONE_GB = 1024;
 
+  private static final String ROLE_B = "Test_Role_B";
+  private static final IJobKey KEY_B = JobKeys.from(ROLE_B, ENV_A, JOB_A);
+
   private static final SlaveID SLAVE_ID = SlaveID.newBuilder().setValue("SlaveId").build();
   private static final String SLAVE_HOST_1 = "SlaveHost1";
 
   private static final QuotaCheckResult ENOUGH_QUOTA = new QuotaCheckResult(SUFFICIENT_QUOTA);
   private static final QuotaCheckResult NOT_ENOUGH_QUOTA = new QuotaCheckResult(INSUFFICIENT_QUOTA);
 
-  public static final CrontabEntry CRONTAB_ENTRY = CrontabEntry.parse("1 1 1 1 *");
-  public static final String RAW_CRONTAB_ENTRY = CRONTAB_ENTRY.toString();
-
   private Driver driver;
   private StateManagerImpl stateManager;
   private Storage storage;
   private SchedulerCoreImpl scheduler;
-  private CronJobManager cronJobManager;
+  private CronScheduler cronScheduler;
+  private CronJobManager cron;
   private FakeClock clock;
   private EventSink eventSink;
   private RescheduleCalculator rescheduleCalculator;
+  private ShutdownRegistry shutdownRegistry;
   private QuotaManager quotaManager;
 
   // TODO(William Farner): Set up explicit expectations for calls to generate task IDs.
@@ -165,15 +164,18 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     clock = new FakeClock();
     eventSink = createMock(EventSink.class);
     rescheduleCalculator = createMock(RescheduleCalculator.class);
-    cronJobManager = createMock(CronJobManager.class);
+    cronScheduler = createMock(CronScheduler.class);
+    shutdownRegistry = createMock(ShutdownRegistry.class);
     quotaManager = createMock(QuotaManager.class);
 
     eventSink.post(EasyMock.<PubsubEvent>anyObject());
     expectLastCall().anyTimes();
+    expect(cronScheduler.schedule(anyObject(String.class), anyObject(Runnable.class)))
+        .andStubReturn("key");
+    expect(cronScheduler.isValidSchedule(anyObject(String.class))).andStubReturn(true);
 
     expect(quotaManager.checkQuota(anyObject(ITaskConfig.class), anyInt()))
         .andStubReturn(ENOUGH_QUOTA);
-    expect(cronJobManager.hasJob(anyObject(IJobKey.class))).andStubReturn(false);
   }
 
   /**
@@ -206,9 +208,15 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
         taskIdGenerator,
         eventSink,
         rescheduleCalculator);
+    cron = new CronJobManager(
+        stateManager,
+        storage,
+        cronScheduler,
+        shutdownRegistry,
+        MoreExecutors.sameThreadExecutor());
     scheduler = new SchedulerCoreImpl(
         storage,
-        cronJobManager,
+        cron,
         stateManager,
         taskIdGenerator,
         quotaManager);
@@ -264,7 +272,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     buildScheduler();
 
     TaskConfig newTask = nonProductionTask();
-    newTask.addToConstraints(dedicatedConstraint(ImmutableSet.of(JobKeys.canonicalString(KEY_A))));
+    newTask.addToConstraints(dedicatedConstraint(ImmutableSet.of(JobKeys.toPath(KEY_A))));
     scheduler.createJob(makeJob(KEY_A, newTask));
     assertEquals(PENDING, getOnlyTask(Query.jobScoped(KEY_A)).getStatus());
   }
@@ -437,6 +445,147 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     scheduler.createJob(makeJob(KEY_A, 1));
   }
 
+  @Test(expected = ScheduleException.class)
+  public void testCreateDuplicateCronJob() throws Exception {
+    SanitizedConfiguration sanitizedConfiguration = makeCronJob(KEY_A, 1, "1 1 1 1 1");
+
+    control.replay();
+    buildScheduler();
+
+    // Cron jobs are scheduled on a delay, so this job's tasks will not be scheduled immediately,
+    // but duplicate jobs should still be rejected.
+    scheduler.createJob(sanitizedConfiguration);
+    assertTaskCount(0);
+
+    scheduler.createJob(makeJob(KEY_A, 1));
+  }
+
+  @Test
+  public void testStartCronJob() throws Exception {
+    // Create a cron job, ask the scheduler to start it, and ensure that the tasks exist
+    // in the PENDING state.
+
+    SanitizedConfiguration sanitizedConfiguration = makeCronJob(KEY_A, 1, "1 1 1 1 1");
+    IJobKey jobKey = sanitizedConfiguration.getJobConfig().getKey();
+
+    control.replay();
+    buildScheduler();
+
+    scheduler.createJob(sanitizedConfiguration);
+    assertTaskCount(0);
+
+    cron.startJobNow(jobKey);
+    assertEquals(PENDING, getOnlyTask(Query.jobScoped(jobKey)).getStatus());
+  }
+
+  @Test(expected = ScheduleException.class)
+  public void testStartNonexistentCronJob() throws Exception {
+    // Try to start a cron job that doesn't exist.
+    control.replay();
+    buildScheduler();
+
+    cron.startJobNow(KEY_A);
+  }
+
+  @Test
+  public void testStartNonCronJob() throws Exception {
+    // Create a NON cron job and try to start it as though it were a cron job, and ensure that
+    // no cron tasks are created.
+    control.replay();
+    buildScheduler();
+
+    scheduler.createJob(makeJob(KEY_A, 1));
+    String taskId = Tasks.id(getOnlyTask(Query.jobScoped(KEY_A)));
+
+    try {
+      cron.startJobNow(KEY_A);
+      fail("Start should have failed.");
+    } catch (ScheduleException e) {
+      // Expected.
+    }
+
+    assertEquals(PENDING, getTask(taskId).getStatus());
+    assertFalse(cron.hasJob(KEY_A));
+  }
+
+  @Test(expected = ScheduleException.class)
+  public void testStartNonOwnedCronJob() throws Exception {
+    // Try to start a cron job that is not owned by us.
+    // Should throw an exception.
+
+    SanitizedConfiguration sanitizedConfiguration = makeCronJob(KEY_A, 1, "1 1 1 1 1");
+    IJobConfiguration job = sanitizedConfiguration.getJobConfig();
+    expect(cronScheduler.isValidSchedule(job.getCronSchedule())).andReturn(true);
+    expect(cronScheduler.schedule(eq(job.getCronSchedule()), EasyMock.<Runnable>anyObject()))
+        .andReturn("key");
+
+    control.replay();
+    buildScheduler();
+
+    scheduler.createJob(sanitizedConfiguration);
+    assertTaskCount(0);
+
+    cron.startJobNow(KEY_B);
+  }
+
+  @Test
+  public void testStartRunningCronJob() throws Exception {
+    // Start a cron job that is already started by an earlier
+    // call and is PENDING. Make sure it follows the cron collision policy.
+    SanitizedConfiguration sanitizedConfiguration =
+        makeCronJob(KEY_A, 1, "1 1 1 1 1", CronCollisionPolicy.KILL_EXISTING);
+    expect(cronScheduler.schedule(eq(sanitizedConfiguration.getJobConfig().getCronSchedule()),
+        EasyMock.<Runnable>anyObject()))
+        .andReturn("key");
+
+    control.replay();
+    buildScheduler();
+
+    scheduler.createJob(sanitizedConfiguration);
+    assertTaskCount(0);
+    assertTrue(cron.hasJob(KEY_A));
+
+    cron.startJobNow(KEY_A);
+    assertTaskCount(1);
+
+    String taskId = Tasks.id(getOnlyTask(Query.jobScoped(KEY_A)));
+
+    // Now start the same cron job immediately.
+    cron.startJobNow(KEY_A);
+    assertTaskCount(1);
+    assertEquals(PENDING, getOnlyTask(Query.jobScoped(KEY_A)).getStatus());
+
+    // Make sure the pending job is the new one.
+    String newTaskId = Tasks.id(getOnlyTask(Query.jobScoped(KEY_A)));
+    assertFalse(taskId.equals(newTaskId));
+  }
+
+  @Test
+  public void testKillCreateCronJob() throws Exception {
+    SanitizedConfiguration sanitizedConfiguration = makeCronJob(KEY_A, 1, "1 1 1 1 1");
+    IJobConfiguration job = sanitizedConfiguration.getJobConfig();
+    expect(cronScheduler.schedule(eq(job.getCronSchedule()), EasyMock.<Runnable>anyObject()))
+        .andReturn("key");
+    cronScheduler.deschedule("key");
+
+    SanitizedConfiguration updated = makeCronJob(KEY_A, 1, "1 2 3 4 5");
+    IJobConfiguration updatedJob = updated.getJobConfig();
+    expect(cronScheduler.schedule(eq(updatedJob.getCronSchedule()), EasyMock.<Runnable>anyObject()))
+        .andReturn("key2");
+
+    control.replay();
+    buildScheduler();
+
+    scheduler.createJob(sanitizedConfiguration);
+    assertTrue(cron.hasJob(KEY_A));
+
+    scheduler.killTasks(Query.jobScoped(KEY_A), OWNER_A.getUser());
+    scheduler.createJob(updated);
+
+    IJobConfiguration stored = Iterables.getOnlyElement(cron.getJobs());
+    assertEquals(updatedJob.getCronSchedule(), stored.getCronSchedule());
+  }
+
   @Test
   public void testKillTask() throws Exception {
     driver.killTask(EasyMock.<String>anyObject());
@@ -482,21 +631,17 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   @Test
   public void testServiceTasksRescheduled() throws Exception {
     int numServiceTasks = 5;
-    IJobKey adhocKey = KEY_A;
-    IJobKey serviceKey = IJobKey.build(
-        adhocKey.newBuilder().setName(adhocKey.getName() + "service"));
 
     expectTaskNotThrottled().times(numServiceTasks);
-    expectNoCronJob(adhocKey);
-    expectNoCronJob(serviceKey);
 
     control.replay();
     buildScheduler();
 
     // Schedule 5 service and 5 non-service tasks.
-    scheduler.createJob(makeJob(adhocKey, numServiceTasks));
+    scheduler.createJob(makeJob(KEY_A, numServiceTasks));
     TaskConfig task = productionTask().setIsService(true);
-    scheduler.createJob(makeJob(serviceKey, task, 5));
+    scheduler.createJob(
+        makeJob(IJobKey.build(KEY_A.newBuilder().setName(KEY_A.getName() + "service")), task, 5));
 
     assertEquals(10, getTasksByStatus(PENDING).size());
     changeStatus(Query.roleScoped(ROLE_A), ASSIGNED, STARTING);
@@ -523,7 +668,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     int totalFailures = 10;
 
     expectTaskNotThrottled().times(totalFailures);
-    expectNoCronJob(KEY_A);
 
     control.replay();
     buildScheduler();
@@ -582,7 +726,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   @Test
   public void testNoTransitionFromTerminalState() throws Exception {
     expectKillTask(1);
-    expectNoCronJob(KEY_A);
 
     control.replay();
     buildScheduler();
@@ -603,7 +746,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   public void testFailedTaskIncrementsFailureCount() throws Exception {
     int maxFailures = 5;
     expectTaskNotThrottled().times(maxFailures - 1);
-    expect(cronJobManager.hasJob(KEY_A)).andReturn(false);
 
     control.replay();
     buildScheduler();
@@ -636,9 +778,78 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testKillPendingTask() throws Exception {
-    expectNoCronJob(KEY_A);
+  public void testCronJobLifeCycle() throws Exception {
+    SanitizedConfiguration sanitizedConfiguration = makeCronJob(KEY_A, 10, "1 1 1 1 1");
+    IJobConfiguration job = sanitizedConfiguration.getJobConfig();
+    expect(cronScheduler.schedule(eq(job.getCronSchedule()), EasyMock.<Runnable>anyObject()))
+        .andReturn("key");
+
+    control.replay();
+    buildScheduler();
+
+    scheduler.createJob(sanitizedConfiguration);
+    assertTaskCount(0);
+    assertTrue(cron.hasJob(KEY_A));
+
+    // Simulate a triggering of the cron job.
+    cron.startJobNow(KEY_A);
+    assertTaskCount(10);
+    assertEquals(10,
+        getTasks(Query.jobScoped(KEY_A).byStatus(PENDING)).size());
+
+    assertTaskCount(10);
+
+    changeStatus(Query.roleScoped(ROLE_A), ASSIGNED, STARTING);
+    assertTaskCount(10);
+    changeStatus(Query.roleScoped(ROLE_A), RUNNING);
+    assertTaskCount(10);
+    changeStatus(Query.roleScoped(ROLE_A), FINISHED);
+  }
+
+  @Test
+  public void testCronNoSuicide() throws Exception {
+    SanitizedConfiguration sanitizedConfiguration =
+        makeCronJob(KEY_A, 10, "1 1 1 1 1", CronCollisionPolicy.KILL_EXISTING);
+    expect(cronScheduler.schedule(eq(sanitizedConfiguration.getJobConfig().getCronSchedule()),
+        EasyMock.<Runnable>anyObject()))
+        .andReturn("key");
+
+    control.replay();
+    buildScheduler();
+
+    scheduler.createJob(sanitizedConfiguration);
+    assertTaskCount(0);
+
+    try {
+      scheduler.createJob(sanitizedConfiguration);
+      fail();
+    } catch (ScheduleException e) {
+      // Expected.
+    }
+    assertTrue(cron.hasJob(KEY_A));
+
+    // Simulate a triggering of the cron job.
+    cron.startJobNow(KEY_A);
+    assertTaskCount(10);
 
+    Set<String> taskIds = Tasks.ids(getTasksOwnedBy(OWNER_A));
+
+    // Simulate a triggering of the cron job.
+    cron.startJobNow(KEY_A);
+    assertTaskCount(10);
+    assertTrue(Sets.intersection(taskIds, Tasks.ids(getTasksOwnedBy(OWNER_A))).isEmpty());
+
+    try {
+      scheduler.createJob(sanitizedConfiguration);
+      fail();
+    } catch (ScheduleException e) {
+      // Expected.
+    }
+    assertTrue(cron.hasJob(KEY_A));
+  }
+
+  @Test
+  public void testKillPendingTask() throws Exception {
     control.replay();
     buildScheduler();
 
@@ -673,12 +884,16 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
 
   @Test
   public void testKillCronTask() throws Exception {
-    expect(cronJobManager.hasJob(KEY_A)).andReturn(false);
-    cronJobManager.createJob(anyObject(SanitizedCronJob.class));
-    expect(cronJobManager.deleteJob(KEY_A)).andReturn(true);
+    SanitizedConfiguration sanitizedConfiguration =
+        makeCronJob(KEY_A, 1, "1 1 1 1 1", CronCollisionPolicy.KILL_EXISTING);
+    expect(cronScheduler.schedule(eq(sanitizedConfiguration.getJobConfig().getCronSchedule()),
+        EasyMock.<Runnable>anyObject()))
+        .andReturn("key");
+    cronScheduler.deschedule("key");
+
     control.replay();
     buildScheduler();
-    scheduler.createJob(makeCronJob(KEY_A, 1, RAW_CRONTAB_ENTRY));
+    scheduler.createJob(makeCronJob(KEY_A, 1, "1 1 1 1 1"));
 
     // This will fail if the cron task could not be found.
     scheduler.killTasks(Query.jobScoped(KEY_A), OWNER_A.getUser());
@@ -688,7 +903,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   public void testLostTaskRescheduled() throws Exception {
     expectKillTask(2);
     expectTaskNotThrottled().times(2);
-    expect(cronJobManager.hasJob(KEY_A)).andReturn(false);
 
     control.replay();
     buildScheduler();
@@ -718,10 +932,32 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testKillJob() throws Exception {
-    expectNoCronJob(KEY_A);
-    expect(cronJobManager.deleteJob(KEY_A)).andReturn(false);
+  public void testKillNotStrictlyJobScoped() throws Exception {
+    // Makes sure that queries that are not strictly job scoped will not remove the job entirely.
+    SanitizedConfiguration config = makeCronJob(KEY_A, 10, "1 1 1 1 1");
+    IJobConfiguration job = config.getJobConfig();
+    expect(cronScheduler.schedule(eq(job.getCronSchedule()), EasyMock.<Runnable>anyObject()))
+        .andReturn("key");
+    cronScheduler.deschedule("key");
+
+    control.replay();
+    buildScheduler();
+
+    scheduler.createJob(config);
+    assertTrue(cron.hasJob(KEY_A));
+    cron.startJobNow(KEY_A);
+    assertTaskCount(10);
+
+    scheduler.killTasks(Query.instanceScoped(KEY_A, 0), USER_A);
+    assertTaskCount(9);
+    assertTrue(cron.hasJob(KEY_A));
 
+    scheduler.killTasks(Query.jobScoped(KEY_A), USER_A);
+    assertFalse(cron.hasJob(KEY_A));
+  }
+
+  @Test
+  public void testKillJob() throws Exception {
     control.replay();
     buildScheduler();
 
@@ -772,7 +1008,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   @Test(expected = ScheduleException.class)
   public void testRestartNonexistentShard() throws Exception {
     expectTaskNotThrottled();
-    expectNoCronJob(KEY_A);
 
     control.replay();
     buildScheduler();
@@ -784,8 +1019,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
 
   @Test
   public void testRestartPendingShard() throws Exception {
-    expect(cronJobManager.hasJob(KEY_A)).andReturn(false);
-
     control.replay();
     buildScheduler();
 
@@ -818,7 +1051,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   public void testPortResourceResetAfterReschedule() throws Exception {
     expectKillTask(1);
     expectTaskNotThrottled();
-    expect(cronJobManager.hasJob(KEY_A)).andReturn(false);
 
     control.replay();
     buildScheduler();
@@ -881,7 +1113,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
       }
     };
 
-    expectNoCronJob(KEY_A);
     control.replay();
     buildScheduler();
 
@@ -936,8 +1167,9 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
 
     scheduler.addInstances(
         job.getKey(),
-        ContiguousSet.create(Range.closed(0, SchedulerCoreImpl.MAX_TASKS_PER_JOB.get()),
-            DiscreteDomain.integers()),
+        ImmutableSet.copyOf(
+            ContiguousSet.create(Range.closed(0, SchedulerCoreImpl.MAX_TASKS_PER_JOB.get()),
+                DiscreteDomain.integers())),
         job.getTaskConfig());
   }
 
@@ -986,7 +1218,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
         .setOwner(OWNER_A);
 
     ImmutableSet<Integer> instances = ImmutableSet.of(0);
-    expectNoCronJob(KEY_A);
 
     control.replay();
     buildScheduler();
@@ -997,14 +1228,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     scheduler.addInstances(KEY_A, instances, ITaskConfig.build(newTask));
   }
 
-  private void expectNoCronJob(int times) throws CronException {
-    expect(cronJobManager.hasJob(isA(IJobKey.class))).andReturn(false).times(times);
-  }
-
-  private void expectNoCronJob(IJobKey jobKey) throws CronException {
-    expect(cronJobManager.hasJob(jobKey)).andReturn(false);
-  }
-
   private static String getLocalHost() {
     try {
       return InetAddress.getLocalHost().getHostName();
@@ -1171,23 +1394,4 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   public void changeStatus(String taskId, ScheduleStatus status, Optional<String> message) {
     changeStatus(Query.taskScoped(taskId), status, message);
   }
-
-  private SanitizedConfiguration hasJobKey(final IJobKey key) {
-    reportMatcher(new IArgumentMatcher() {
-      @Override
-      public boolean matches(Object item) {
-        if (!(item instanceof SanitizedConfiguration)) {
-          return false;
-        }
-        SanitizedConfiguration configuration = (SanitizedConfiguration) item;
-        return key.equals(configuration.getJobConfig().getKey());
-      }
-
-      @Override
-      public void appendTo(StringBuffer buffer) {
-        buffer.append(key.toString());
-      }
-    });
-    return null;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5de2b1ab/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java b/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java
new file mode 100644
index 0000000..fa9cb75
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java
@@ -0,0 +1,490 @@
+/**
+ * Copyright 2013 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.state;
+
+import java.util.concurrent.Executor;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.twitter.common.application.ShutdownRegistry;
+import com.twitter.common.base.ExceptionalCommand;
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.CronCollisionPolicy;
+import org.apache.aurora.gen.ExecutorConfig;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.JobConfiguration;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.ScheduleException;
+import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
+import org.apache.aurora.scheduler.cron.CronException;
+import org.apache.aurora.scheduler.cron.CronScheduler;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IExpectationSetters;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.apiConstants.DEFAULT_ENVIRONMENT;
+import static org.apache.aurora.scheduler.state.CronJobManager.MANAGER_KEY;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class CronJobManagerTest extends EasyMockTest {
+
+  private static final String OWNER = "owner";
+  private static final String ENVIRONMENT = "staging11";
+  private static final String JOB_NAME = "jobName";
+  private static final String DEFAULT_JOB_KEY = "key";
+  private static final String TASK_ID = "id";
+  private static final IScheduledTask TASK = IScheduledTask.build(
+      new ScheduledTask().setAssignedTask(new AssignedTask().setTaskId(TASK_ID)));
+
+  private StateManager stateManager;
+  private Executor delayExecutor;
+  private Capture<Runnable> delayLaunchCapture;
+  private StorageTestUtil storageUtil;
+
+  private CronScheduler cronScheduler;
+  private ShutdownRegistry shutdownRegistry;
+  private CronJobManager cron;
+  private IJobConfiguration job;
+  private SanitizedConfiguration sanitizedConfiguration;
+
+  @Before
+  public void setUp() throws Exception {
+    stateManager = createMock(StateManager.class);
+    delayExecutor = createMock(Executor.class);
+    delayLaunchCapture = createCapture();
+    storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
+    cronScheduler = createMock(CronScheduler.class);
+    shutdownRegistry = createMock(ShutdownRegistry.class);
+
+    cron = new CronJobManager(
+        stateManager,
+        storageUtil.storage,
+        cronScheduler,
+        shutdownRegistry,
+        delayExecutor);
+    job = makeJob();
+    sanitizedConfiguration = SanitizedConfiguration.fromUnsanitized(job);
+  }
+
+  private void expectJobValidated(IJobConfiguration savedJob) {
+    expect(cronScheduler.isValidSchedule(savedJob.getCronSchedule())).andReturn(true);
+  }
+
+  private void expectJobValidated() {
+    expectJobValidated(sanitizedConfiguration.getJobConfig());
+  }
+
+  private Capture<Runnable> expectJobAccepted(IJobConfiguration savedJob) throws Exception {
+    expectJobValidated(savedJob);
+    storageUtil.jobStore.saveAcceptedJob(MANAGER_KEY, savedJob);
+    Capture<Runnable> jobTriggerCapture = createCapture();
+    expect(cronScheduler.schedule(eq(savedJob.getCronSchedule()), capture(jobTriggerCapture)))
+        .andReturn(DEFAULT_JOB_KEY);
+    return jobTriggerCapture;
+  }
+
+  private void expectJobAccepted() throws Exception {
+    expectJobAccepted(sanitizedConfiguration.getJobConfig());
+  }
+
+  private void expectJobFetch() {
+    expectJobValidated();
+    expect(storageUtil.jobStore.fetchJob(MANAGER_KEY, job.getKey()))
+        .andReturn(Optional.of(sanitizedConfiguration.getJobConfig()));
+  }
+
+  private IExpectationSetters<?> expectActiveTaskFetch(IScheduledTask... activeTasks) {
+    return storageUtil.expectTaskFetch(Query.jobScoped(job.getKey()).active(), activeTasks);
+  }
+
+  @Test
+  public void testPubsubWiring() throws Exception {
+    expect(cronScheduler.startAsync()).andReturn(cronScheduler);
+    cronScheduler.awaitRunning();
+    shutdownRegistry.addAction(EasyMock.<ExceptionalCommand<?>>anyObject());
+    expect(storageUtil.jobStore.fetchJobs(MANAGER_KEY))
+        .andReturn(ImmutableList.<IJobConfiguration>of());
+
+    control.replay();
+
+    Injector injector = Guice.createInjector(new AbstractModule() {
+      @Override
+      protected void configure() {
+        bind(StateManager.class).toInstance(stateManager);
+        bind(Storage.class).toInstance(storageUtil.storage);
+        bind(CronScheduler.class).toInstance(cronScheduler);
+        bind(ShutdownRegistry.class).toInstance(shutdownRegistry);
+        PubsubTestUtil.installPubsub(binder());
+        StateModule.bindCronJobManager(binder());
+      }
+    });
+    cron = injector.getInstance(CronJobManager.class);
+    EventSink eventSink = PubsubTestUtil.startPubsub(injector);
+    eventSink.post(new SchedulerActive());
+  }
+
+  @Test
+  public void testStart() throws Exception {
+    expectJobAccepted();
+    expectJobFetch();
+    expectActiveTaskFetch();
+
+    // Job is executed immediately since there are no existing tasks to kill.
+    stateManager.insertPendingTasks(sanitizedConfiguration.getTaskConfigs());
+    expect(cronScheduler.getSchedule(DEFAULT_JOB_KEY))
+        .andReturn(Optional.of(job.getCronSchedule()))
+        .times(2);
+
+    control.replay();
+
+    assertEquals(ImmutableMap.<IJobKey, String>of(), cron.getScheduledJobs());
+    cron.receiveJob(sanitizedConfiguration);
+    assertEquals(ImmutableMap.of(job.getKey(), job.getCronSchedule()), cron.getScheduledJobs());
+    cron.startJobNow(job.getKey());
+    assertEquals(ImmutableMap.of(job.getKey(), job.getCronSchedule()), cron.getScheduledJobs());
+  }
+
+  @Test
+  public void testDeleteInconsistent() throws Exception {
+    // Tests a case where a job exists in the storage, but is not registered with the cron system.
+
+    expect(storageUtil.jobStore.fetchJob(MANAGER_KEY, job.getKey()))
+        .andReturn(Optional.of(sanitizedConfiguration.getJobConfig()));
+
+    control.replay();
+
+    assertTrue(cron.deleteJob(job.getKey()));
+  }
+
+  private IExpectationSetters<?> expectTaskKilled(String id) {
+    expect(stateManager.changeState(
+        id,
+        Optional.<ScheduleStatus>absent(),
+        ScheduleStatus.KILLING,
+        CronJobManager.KILL_AUDIT_MESSAGE))
+        .andReturn(true);
+    return expectLastCall();
+  }
+
+  @Test
+  public void testDelayedStart() throws Exception {
+    expectJobAccepted();
+    expectJobFetch();
+
+    // Query to test if live tasks exist for the job.
+    expectActiveTaskFetch(TASK);
+
+    // Live tasks exist, so the cron manager must delay the cron launch.
+    delayExecutor.execute(capture(delayLaunchCapture));
+
+    // The cron manager will then try to initiate the kill.
+    expectTaskKilled(TASK_ID);
+
+    // Immediate query and delayed query.
+    expectActiveTaskFetch(TASK).times(2);
+
+    // Simulate the live task disappearing.
+    expectActiveTaskFetch();
+
+    stateManager.insertPendingTasks(sanitizedConfiguration.getTaskConfigs());
+
+    control.replay();
+
+    cron.receiveJob(sanitizedConfiguration);
+    cron.startJobNow(job.getKey());
+    assertEquals(ImmutableSet.of(job.getKey()), cron.getPendingRuns());
+    delayLaunchCapture.getValue().run();
+    assertEquals(ImmutableSet.<IJobKey>of(), cron.getPendingRuns());
+  }
+
+  @Test
+  public void testDelayedStartResets() throws Exception {
+    expectJobAccepted();
+    expectJobFetch();
+
+    // Query to test if live tasks exist for the job.
+    expectActiveTaskFetch(TASK);
+
+    // Live tasks exist, so the cron manager must delay the cron launch.
+    delayExecutor.execute(capture(delayLaunchCapture));
+
+    // The cron manager will then try to initiate the kill.
+    expectTaskKilled(TASK_ID);
+
+    // Immediate query and delayed query.
+    expectActiveTaskFetch(TASK).times(2);
+
+    // Simulate the live task disappearing.
+    expectActiveTaskFetch();
+
+    // Round two.
+    expectJobFetch();
+    expectActiveTaskFetch(TASK);
+    delayExecutor.execute(capture(delayLaunchCapture));
+    expectTaskKilled(TASK_ID);
+    expectActiveTaskFetch(TASK).times(2);
+    expectActiveTaskFetch();
+
+    stateManager.insertPendingTasks(sanitizedConfiguration.getTaskConfigs());
+    expectLastCall().times(2);
+
+    control.replay();
+
+    cron.receiveJob(sanitizedConfiguration);
+    cron.startJobNow(job.getKey());
+    delayLaunchCapture.getValue().run();
+
+    // Start the job again.  Since the previous delayed start completed, this should repeat the
+    // entire process.
+    cron.startJobNow(job.getKey());
+    delayLaunchCapture.getValue().run();
+  }
+
+  @Test
+  public void testDelayedStartMultiple() throws Exception {
+    expectJobAccepted();
+    expectJobFetch();
+
+    // Query to test if live tasks exist for the job.
+    expectActiveTaskFetch(TASK).times(3);
+
+    // Live tasks exist, so the cron manager must delay the cron launch.
+    delayExecutor.execute(capture(delayLaunchCapture));
+
+    // The cron manager will then try to initiate the kill.
+    expectJobFetch();
+    expectJobFetch();
+    expectTaskKilled(TASK_ID).times(3);
+
+    // Immediate queries and delayed query.
+    expectActiveTaskFetch(TASK).times(4);
+
+    // Simulate the live task disappearing.
+    expectActiveTaskFetch();
+
+    stateManager.insertPendingTasks(sanitizedConfiguration.getTaskConfigs());
+
+    control.replay();
+
+    cron.receiveJob(sanitizedConfiguration);
+
+    // Attempt to trick the cron manager into launching multiple times, or launching multiple
+    // pollers.
+    cron.startJobNow(job.getKey());
+    cron.startJobNow(job.getKey());
+    cron.startJobNow(job.getKey());
+    delayLaunchCapture.getValue().run();
+  }
+
+  @Test
+  public void testUpdate() throws Exception {
+    SanitizedConfiguration updated = new SanitizedConfiguration(
+        IJobConfiguration.build(job.newBuilder().setCronSchedule("1 2 3 4 5")));
+
+    expectJobAccepted();
+    cronScheduler.deschedule(DEFAULT_JOB_KEY);
+    expectJobAccepted(updated.getJobConfig());
+
+    control.replay();
+
+    cron.receiveJob(sanitizedConfiguration);
+    cron.updateJob(updated);
+  }
+
+  @Test
+  public void testConsistentState() throws Exception {
+    IJobConfiguration updated =
+        IJobConfiguration.build(makeJob().newBuilder().setCronSchedule("1 2 3 4 5"));
+
+    expectJobAccepted();
+    cronScheduler.deschedule(DEFAULT_JOB_KEY);
+    expectJobAccepted(updated);
+
+    control.replay();
+
+    cron.receiveJob(sanitizedConfiguration);
+
+    IJobConfiguration failedUpdate =
+        IJobConfiguration.build(updated.newBuilder().setCronSchedule(null));
+    try {
+      cron.updateJob(new SanitizedConfiguration(failedUpdate));
+      fail();
+    } catch (ScheduleException e) {
+      // Expected.
+    }
+
+    cron.updateJob(new SanitizedConfiguration(updated));
+  }
+
+  @Test
+  public void testInvalidStoredJob() throws Exception {
+    // Invalid jobs are left alone, but doesn't halt operation.
+
+    expect(cronScheduler.startAsync()).andReturn(cronScheduler);
+    cronScheduler.awaitRunning();
+    shutdownRegistry.addAction(EasyMock.<ExceptionalCommand<?>>anyObject());
+
+    IJobConfiguration jobA =
+        IJobConfiguration.build(makeJob().newBuilder().setCronSchedule("1 2 3 4 5 6 7"));
+    IJobConfiguration jobB =
+        IJobConfiguration.build(makeJob().newBuilder().setCronSchedule(null));
+
+    expect(storageUtil.jobStore.fetchJobs(MANAGER_KEY)).andReturn(ImmutableList.of(jobA, jobB));
+    expect(cronScheduler.isValidSchedule(jobA.getCronSchedule())).andReturn(false);
+
+    control.replay();
+
+    cron.schedulerActive(new SchedulerActive());
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testJobStoredTwice() throws Exception {
+    // Simulate an inconsistent storage that contains two cron jobs under the same key.
+
+    expect(cronScheduler.startAsync()).andReturn(cronScheduler);
+    cronScheduler.awaitRunning();
+    shutdownRegistry.addAction(EasyMock.<ExceptionalCommand<?>>anyObject());
+
+    IJobConfiguration jobA =
+        IJobConfiguration.build(makeJob().newBuilder().setCronSchedule("1 2 3 4 5"));
+    IJobConfiguration jobB =
+        IJobConfiguration.build(makeJob().newBuilder().setCronSchedule("* * * * *"));
+    expect(storageUtil.jobStore.fetchJobs(MANAGER_KEY))
+        .andReturn(ImmutableList.of(jobA, jobB));
+    expectJobValidated(jobA);
+    expect(cronScheduler.schedule(eq(jobA.getCronSchedule()), EasyMock.<Runnable>anyObject()))
+        .andReturn("keyA");
+    expectJobValidated(jobB);
+
+    control.replay();
+
+    cron.schedulerActive(new SchedulerActive());
+  }
+
+  @Test(expected = ScheduleException.class)
+  public void testInvalidCronSchedule() throws Exception {
+    expect(cronScheduler.isValidSchedule(job.getCronSchedule())).andReturn(false);
+
+    control.replay();
+
+    cron.receiveJob(sanitizedConfiguration);
+  }
+
+  @Test
+  public void testCancelNewCollision() throws Exception {
+    IJobConfiguration killExisting = IJobConfiguration.build(
+        job.newBuilder().setCronCollisionPolicy(CronCollisionPolicy.CANCEL_NEW));
+    Capture<Runnable> jobTriggerCapture = expectJobAccepted(killExisting);
+    expectActiveTaskFetch(TASK);
+
+    control.replay();
+
+    cron.receiveJob(new SanitizedConfiguration(killExisting));
+    jobTriggerCapture.getValue().run();
+  }
+
+  @Test(expected = ScheduleException.class)
+  public void testScheduleFails() throws Exception {
+    expectJobValidated(job);
+    storageUtil.jobStore.saveAcceptedJob(MANAGER_KEY, sanitizedConfiguration.getJobConfig());
+    expect(cronScheduler.schedule(eq(job.getCronSchedule()), EasyMock.<Runnable>anyObject()))
+        .andThrow(new CronException("injected"));
+
+    control.replay();
+
+    cron.receiveJob(sanitizedConfiguration);
+  }
+
+  @Test(expected = ScheduleException.class)
+  public void testRunOverlapRejected() throws Exception {
+    IJobConfiguration killExisting = IJobConfiguration.build(
+        job.newBuilder().setCronCollisionPolicy(CronCollisionPolicy.RUN_OVERLAP));
+
+    control.replay();
+
+    cron.receiveJob(new SanitizedConfiguration(killExisting));
+  }
+
+  @Test
+  public void testRunOverlapLoadedSuccessfully() throws Exception {
+    // Existing RUN_OVERLAP jobs should still load and map.
+
+    expect(cronScheduler.startAsync()).andReturn(cronScheduler);
+    cronScheduler.awaitRunning();
+    shutdownRegistry.addAction(EasyMock.<ExceptionalCommand<?>>anyObject());
+
+    IJobConfiguration jobA =
+        IJobConfiguration.build(makeJob().newBuilder()
+            .setCronCollisionPolicy(CronCollisionPolicy.RUN_OVERLAP));
+
+    expect(storageUtil.jobStore.fetchJobs(MANAGER_KEY)).andReturn(ImmutableList.of(jobA));
+    expect(cronScheduler.isValidSchedule(jobA.getCronSchedule())).andReturn(true);
+    expect(cronScheduler.schedule(eq(jobA.getCronSchedule()), EasyMock.<Runnable>anyObject()))
+        .andReturn("keyA");
+
+    control.replay();
+
+    cron.schedulerActive(new SchedulerActive());
+  }
+
+  private IJobConfiguration makeJob() {
+    return IJobConfiguration.build(new JobConfiguration()
+        .setOwner(new Identity(OWNER, OWNER))
+        .setKey(JobKeys.from(OWNER, ENVIRONMENT, JOB_NAME).newBuilder())
+        .setCronSchedule("1 1 1 1 1")
+        .setTaskConfig(defaultTask())
+        .setInstanceCount(1));
+  }
+
+  private static TaskConfig defaultTask() {
+    return new TaskConfig()
+        .setContactEmail("testing@twitter.com")
+        .setExecutorConfig(new ExecutorConfig("aurora", "data"))
+        .setNumCpus(1)
+        .setRamMb(1024)
+        .setDiskMb(1024)
+        .setJobName(JOB_NAME)
+        .setOwner(new Identity(OWNER, OWNER))
+        .setEnvironment(DEFAULT_ENVIRONMENT);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5de2b1ab/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
index cdb1f5d..c8ad55d 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
@@ -133,6 +133,6 @@ public class LockManagerImplTest extends EasyMockTest {
 
   private void expectLockException(IJobKey key) {
     expectedException.expect(LockException.class);
-    expectedException.expectMessage(JobKeys.canonicalString(key));
+    expectedException.expectMessage(JobKeys.toPath(key));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5de2b1ab/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index f761feb..f530c53 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -76,13 +76,10 @@ import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.ScheduleException;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
-import org.apache.aurora.scheduler.cron.CronException;
-import org.apache.aurora.scheduler.cron.CronJobManager;
 import org.apache.aurora.scheduler.cron.CronPredictor;
-import org.apache.aurora.scheduler.cron.CrontabEntry;
-import org.apache.aurora.scheduler.cron.SanitizedCronJob;
 import org.apache.aurora.scheduler.quota.QuotaInfo;
 import org.apache.aurora.scheduler.quota.QuotaManager;
+import org.apache.aurora.scheduler.state.CronJobManager;
 import org.apache.aurora.scheduler.state.LockManager;
 import org.apache.aurora.scheduler.state.LockManager.LockException;
 import org.apache.aurora.scheduler.state.MaintenanceController;
@@ -138,7 +135,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   private static final IJobKey JOB_KEY = JobKeys.from(ROLE, DEFAULT_ENVIRONMENT, JOB_NAME);
   private static final ILockKey LOCK_KEY = ILockKey.build(LockKey.job(JOB_KEY.newBuilder()));
   private static final ILock LOCK = ILock.build(new Lock().setKey(LOCK_KEY.newBuilder()));
-  private static final JobConfiguration CRON_JOB = makeJob().setCronSchedule("* * * * *");
+  private static final JobConfiguration CRON_JOB = makeJob().setCronSchedule("test");
   private static final Lock DEFAULT_LOCK = null;
 
   private static final IResourceAggregate QUOTA =
@@ -669,7 +666,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   public void testReplaceCronTemplate() throws Exception {
     expectAuth(ROLE, true);
     lockManager.validateIfLocked(LOCK_KEY, Optional.<ILock>absent());
-    cronJobManager.updateJob(anyObject(SanitizedCronJob.class));
+    cronJobManager.updateJob(anyObject(SanitizedConfiguration.class));
     control.replay();
 
     assertEquals(
@@ -704,9 +701,9 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   public void testReplaceCronTemplateDoesNotExist() throws Exception {
     expectAuth(ROLE, true);
     lockManager.validateIfLocked(LOCK_KEY, Optional.<ILock>absent());
-    cronJobManager.updateJob(anyObject(SanitizedCronJob.class));
-    expectLastCall().andThrow(new CronException("Nope"));
-
+    cronJobManager.updateJob(
+        SanitizedConfiguration.fromUnsanitized(IJobConfiguration.build(CRON_JOB)));
+    expectLastCall().andThrow(new ScheduleException("Nope"));
     control.replay();
 
     assertEquals(
@@ -1011,7 +1008,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     Set<JobSummary> ownedImmedieteJobSummaryOnly = ImmutableSet.of(
         new JobSummary().setJob(ownedImmediateJob).setStats(new JobStats().setActiveTaskCount(1)));
 
-    expect(cronPredictor.predictNextRun(CrontabEntry.parse(CRON_SCHEDULE)))
+    expect(cronPredictor.predictNextRun(CRON_SCHEDULE))
         .andReturn(new Date(nextCronRunMs))
         .anyTimes();
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5de2b1ab/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
index 8673330..e212174 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
@@ -34,8 +34,8 @@ import org.apache.aurora.gen.AuroraAdmin;
 import org.apache.aurora.gen.ResourceAggregate;
 import org.apache.aurora.gen.ServerInfo;
 import org.apache.aurora.gen.SessionKey;
-import org.apache.aurora.scheduler.cron.CronJobManager;
 import org.apache.aurora.scheduler.cron.CronPredictor;
+import org.apache.aurora.scheduler.cron.CronScheduler;
 import org.apache.aurora.scheduler.quota.QuotaManager;
 import org.apache.aurora.scheduler.state.LockManager;
 import org.apache.aurora.scheduler.state.MaintenanceController;
@@ -149,7 +149,7 @@ public class ThriftIT extends EasyMockTest {
 
           @Override
           protected void configure() {
-            bindMock(CronJobManager.class);
+            bindMock(CronScheduler.class);
             bindMock(MaintenanceController.class);
             bindMock(Recovery.class);
             bindMock(SchedulerCore.class);


Mime
View raw message