aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kevi...@apache.org
Subject git commit: CronScheduler implementation based on Quartz.
Date Wed, 05 Feb 2014 03:10:23 GMT
Updated Branches:
  refs/heads/kts/AURORA-132 [created] 676f2b151


CronScheduler implementation based on Quartz.

TODO (Before initial review):
  * Javadoc public classes.
  * Coverage report.

TODO (Subsequent reviews):
  * Drop .noop package; make QuartCronModule the default
  * Delete .testing package; merge QuartzCronIT and AbstractCronIT
  * Delete thrift testing fixtures.
  * Add examples to examples/jobs
  * Add e2e test coverage for examples
  * Document cron schedule syntax in config reference.
  * Create epic for reliable execution.


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/676f2b15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/676f2b15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/676f2b15

Branch: refs/heads/kts/AURORA-132
Commit: 676f2b1518830a491a53280795269fc12bd406e1
Parents: 80106ee
Author: Kevin Sweeney <kevints@apache.org>
Authored: Mon Feb 3 17:11:57 2014 -0800
Committer: Kevin Sweeney <kevints@apache.org>
Committed: Tue Feb 4 19:09:47 2014 -0800

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 .../aurora/scheduler/cron/CronScheduler.java    |  23 +--
 .../scheduler/cron/noop/NoopCronScheduler.java  |  24 +--
 .../scheduler/cron/quartz/QuartzCronModule.java |  82 +++++++++
 .../cron/quartz/QuartzCronPredictor.java        |  47 +++++
 .../cron/quartz/QuartzCronScheduler.java        | 176 +++++++++++++++++++
 .../scheduler/cron/testing/AbstractCronIT.java  |  25 ++-
 .../aurora/scheduler/state/CronJobManager.java  |  10 +-
 .../aurora/scheduler/cron/noop/NoopCronIT.java  |   4 +-
 .../scheduler/cron/quartz/QuartzCronIT.java     |  63 +++++++
 .../cron/quartz/QuartzCronPredictorTest.java    |  55 ++++++
 .../cron/quartz/QuartzCronSchedulerTest.java    | 111 ++++++++++++
 .../scheduler/state/CronJobManagerTest.java     |   9 +-
 13 files changed, 573 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/676f2b15/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 2f355d3..88ddcce 100644
--- a/build.gradle
+++ b/build.gradle
@@ -190,6 +190,7 @@ dependencies {
   compile 'com.twitter.common:util-testing:0.0.7'
   compile 'com.twitter.common:util:0.0.87'
   compile 'com.twitter.common:zookeeper-testing:0.0.40'
+  compile 'org.quartz-scheduler:quartz:2.2.1'
   testCompile 'junit:junit:4.10'
 
   generatedCompile guavaDep

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/676f2b15/src/main/java/org/apache/aurora/scheduler/cron/CronScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/CronScheduler.java b/src/main/java/org/apache/aurora/scheduler/cron/CronScheduler.java
index 34e1749..56e9950 100644
--- a/src/main/java/org/apache/aurora/scheduler/cron/CronScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/cron/CronScheduler.java
@@ -18,11 +18,12 @@ package org.apache.aurora.scheduler.cron;
 import javax.annotation.Nullable;
 
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Service;
 
 /**
  * An execution manager that executes work on a cron schedule.
  */
-public interface CronScheduler {
+public interface CronScheduler extends Service {
   /**
    * Schedules a task on a cron schedule.
    *
@@ -53,26 +54,6 @@ public interface CronScheduler {
   Optional<String> getSchedule(String key) throws IllegalStateException;
 
   /**
-   * Block until fully initialized. It is an error to call start twice. Prior to calling
start,
-   * all other methods of this interface may throw {@link IllegalStateException}. The underlying
-   * implementation should not spawn threads or connect to databases prior to invocation
of
-   * {@link #start()}.
-   *
-   * @throws IllegalStateException If called twice.
-   */
-  void start() throws IllegalStateException;
-
-  /**
-   * Block until stopped. Generally this means that underlying resources are freed, threads
are
-   * terminated, and any bookkeeping state is persisted. If {@link #stop()} has already been
called
-   * by another thread, {@link #stop()} either blocks until completion or returns immediately.
-   *
-   * @throws CronException If there was a problem stopping the scheduler, for example if
it was not
-   *                       started.
-   */
-  void stop() throws CronException;
-
-  /**
    * Checks to see if the scheduler would be accepted by the underlying scheduler.
    *
    * @param schedule Cron scheduler to validate.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/676f2b15/src/main/java/org/apache/aurora/scheduler/cron/noop/NoopCronScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/noop/NoopCronScheduler.java b/src/main/java/org/apache/aurora/scheduler/cron/noop/NoopCronScheduler.java
index bbee5ef..a31551c 100644
--- a/src/main/java/org/apache/aurora/scheduler/cron/noop/NoopCronScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/cron/noop/NoopCronScheduler.java
@@ -23,8 +23,8 @@ import javax.annotation.Nullable;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AbstractIdleService;
 
-import org.apache.aurora.scheduler.cron.CronException;
 import org.apache.aurora.scheduler.cron.CronScheduler;
 
 /**
@@ -35,13 +35,23 @@ import org.apache.aurora.scheduler.cron.CronScheduler;
  * This class exists as a short term hack to get around a license compatibility issue - Real
  * Implementation (TM) coming soon.
  */
-class NoopCronScheduler implements CronScheduler {
+class NoopCronScheduler extends AbstractIdleService implements CronScheduler {
   private static final Logger LOG = Logger.getLogger(NoopCronScheduler.class.getName());
 
   // Keep a list of schedules we've seen.
   private final Set<String> schedules = Collections.synchronizedSet(Sets.<String>newHashSet());
 
   @Override
+  public void startUp() throws Exception {
+    LOG.warning("NO-OP cron scheduler is in use. Cron jobs submitted will not be triggered!");
+  }
+
+  @Override
+  public void shutDown() {
+    // No-op.
+  }
+
+  @Override
   public String schedule(String schedule, Runnable task) {
     schedules.add(schedule);
 
@@ -66,16 +76,6 @@ class NoopCronScheduler implements CronScheduler {
   }
 
   @Override
-  public void start() throws IllegalStateException {
-    LOG.warning("NO-OP cron scheduler is in use. Cron jobs submitted will not be triggered!");
-  }
-
-  @Override
-  public void stop() throws CronException {
-    // No-op.
-  }
-
-  @Override
   public boolean isValidSchedule(@Nullable String schedule) {
     // Accept everything.
     return schedule != null;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/676f2b15/src/main/java/org/apache/aurora/scheduler/cron/quartz/QuartzCronModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/QuartzCronModule.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/QuartzCronModule.java
new file mode 100644
index 0000000..7c4bddf
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/QuartzCronModule.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import java.util.UUID;
+import java.util.logging.Logger;
+
+import javax.inject.Singleton;
+
+import com.google.common.base.Throwables;
+import com.google.inject.PrivateModule;
+import com.google.inject.Provides;
+
+import org.apache.aurora.scheduler.cron.CronPredictor;
+import org.apache.aurora.scheduler.cron.CronScheduler;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.impl.DirectSchedulerFactory;
+import org.quartz.simpl.RAMJobStore;
+import org.quartz.simpl.SimpleThreadPool;
+import org.quartz.spi.ThreadPool;
+
+/**
+ * Created by ksweeney on 2/3/14.
+ */
+public class QuartzCronModule extends PrivateModule {
+  private static final Logger LOG = Logger.getLogger(QuartzCronModule.class.getName());
+
+  // TODO(kevints): Consider making this configurable if the global write lock goes away.
+  private static final int NUM_THREADS = 1;
+
+  @Override
+  protected void configure() {
+    bind(CronScheduler.class).to(QuartzCronScheduler.class);
+    bind(QuartzCronScheduler.class).in(Singleton.class);
+
+    bind(CronPredictor.class).to(QuartzCronPredictor.class);
+    bind(QuartzCronPredictor.class).in(Singleton.class);
+
+    expose(CronPredictor.class);
+    expose(CronScheduler.class);
+  }
+
+  /*
+   * XXX: Quartz implements DirectSchedulerFactory as a mutable global singleton in a static
+   * variable. While the Scheduler instances it produces are independent we synchronize here
to
+   * avoid an initialization race across injectors. In practice this only shows up during
testing;
+   * production Aurora instances will only have one object graph at a time.
+   */
+  @Provides
+  private static synchronized Scheduler provideScheduler(ThreadPool threadPool) {
+    DirectSchedulerFactory schedulerFactory = DirectSchedulerFactory.getInstance();
+    String schedulerName = "aurora-cron-" + UUID.randomUUID().toString();
+    try {
+      schedulerFactory.createScheduler(schedulerName, schedulerName, threadPool, new RAMJobStore());
+      return schedulerFactory.getScheduler(schedulerName);
+    } catch (SchedulerException e) {
+      LOG.severe("Error initializing Quartz cron scheduler: " + e);
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Provides
+  private ThreadPool provideThreadPool() {
+    SimpleThreadPool simpleThreadPool = new SimpleThreadPool(NUM_THREADS, Thread.NORM_PRIORITY);
+    simpleThreadPool.setMakeThreadsDaemons(true);
+    return simpleThreadPool;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/676f2b15/src/main/java/org/apache/aurora/scheduler/cron/quartz/QuartzCronPredictor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/QuartzCronPredictor.java
b/src/main/java/org/apache/aurora/scheduler/cron/quartz/QuartzCronPredictor.java
new file mode 100644
index 0000000..e66de2e
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/QuartzCronPredictor.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import java.text.ParseException;
+import java.util.Date;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.scheduler.cron.CronPredictor;
+import org.quartz.CronExpression;
+
+class QuartzCronPredictor implements CronPredictor {
+  private final Clock clock;
+
+  @Inject
+  public QuartzCronPredictor(Clock clock) {
+    this.clock = Preconditions.checkNotNull(clock);
+  }
+
+  @Override
+  public Date predictNextRun(String schedule) {
+    try {
+      CronExpression cronExpression = new CronExpression(schedule);
+      return cronExpression.getNextValidTimeAfter(new Date(clock.nowMillis()));
+    } catch (ParseException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/676f2b15/src/main/java/org/apache/aurora/scheduler/cron/quartz/QuartzCronScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/QuartzCronScheduler.java
b/src/main/java/org/apache/aurora/scheduler/cron/quartz/QuartzCronScheduler.java
new file mode 100644
index 0000000..90829bc
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/QuartzCronScheduler.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import java.text.ParseException;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.twitter.common.base.Function;
+import com.twitter.common.stats.Stats;
+
+import org.apache.aurora.scheduler.cron.CronException;
+import org.apache.aurora.scheduler.cron.CronScheduler;
+import org.quartz.CronExpression;
+import org.quartz.CronScheduleBuilder;
+import org.quartz.CronTrigger;
+import org.quartz.Job;
+import org.quartz.JobBuilder;
+import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.TriggerBuilder;
+import org.quartz.spi.JobFactory;
+import org.quartz.spi.TriggerFiredBundle;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+class QuartzCronScheduler extends AbstractIdleService implements CronScheduler {
+  private static final Logger LOG = Logger.getLogger(QuartzCronScheduler.class.getName());
+  private static final String QUARTZ_GROUP_NAME = "aurora-cron";
+  private static org.quartz.JobKey getQuartzJobKey(String cronJobId) {
+    return org.quartz.JobKey.jobKey(cronJobId, QUARTZ_GROUP_NAME);
+  }
+
+  private final AtomicInteger startedFlag = Stats.exportInt("quartz_scheduler_running");
+
+  private final AtomicLong cronJobIdGenerator = new AtomicLong();
+  private final ConcurrentMap<String, Runnable> cronJobIdToTask = Maps.newConcurrentMap();
+  {
+    Stats.exportSize("quartz_scheduler_cron_jobs_scheduled", cronJobIdToTask);
+  }
+  private final Job executeFromIdMap = new Job() {
+    @Override public void execute(JobExecutionContext context) throws JobExecutionException
{
+      cronJobIdToTask.get(context.getJobDetail().getKey().getName()).run();
+    }
+  };
+  private final JobFactory executeFromIdMapFactory = new JobFactory() {
+    @Override public Job newJob(TriggerFiredBundle unused1, Scheduler unused2) {
+      return executeFromIdMap;
+    }
+  };
+
+  private final Scheduler scheduler;
+
+  @Inject
+  QuartzCronScheduler(Scheduler scheduler) {
+    this.scheduler = checkNotNull(scheduler);
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    LOG.info("Starting Quartz cron scheduler.");
+    scheduler.setJobFactory(executeFromIdMapFactory);
+    scheduler.start();
+    startedFlag.set(1);
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    LOG.info("Shutting down Quartz cron scheduler.");
+    scheduler.shutdown();
+    startedFlag.set(0);
+  }
+
+  @Override
+  public String schedule(String schedule, Runnable task) throws CronException {
+    checkNotNull(schedule);
+    checkNotNull(task);
+
+    CronExpression cronExpression;
+    try {
+      cronExpression = new CronExpression(schedule);
+    } catch (ParseException e) {
+      throw new CronException(
+          String.format("Invalid cron expression %s: %s.", schedule, e.getMessage()), e);
+    }
+
+    String cronJobId = Long.toString(cronJobIdGenerator.incrementAndGet());
+    cronJobIdToTask.put(cronJobId, task);
+
+    Trigger trigger = TriggerBuilder.newTrigger()
+        .withIdentity(cronJobId, QUARTZ_GROUP_NAME)
+        .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression))
+        .build();
+
+    JobDetail jobDetail = JobBuilder.newJob(executeFromIdMap.getClass())
+        .withIdentity(getQuartzJobKey(cronJobId))
+        .build();
+
+    try {
+      scheduler.scheduleJob(jobDetail, trigger);
+      return cronJobId;
+    } catch (SchedulerException e) {
+      LOG.log(Level.SEVERE, "Failed to schedule cron job " + cronJobId + ": " + e, e);
+      throw new CronException(e);
+    }
+  }
+
+  @Override
+  public void deschedule(String cronJobId) {
+    checkNotNull(cronJobId);
+
+    cronJobIdToTask.remove(cronJobId);
+    try {
+      scheduler.deleteJob(getQuartzJobKey(cronJobId));
+    } catch (SchedulerException e) {
+      LOG.log(Level.SEVERE, "Couldn't delete job " + cronJobId + " from Quartz: " + e, e);
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public Optional<String> getSchedule(String cronJobId) throws IllegalStateException
{
+    checkNotNull(cronJobId);
+
+    if (!cronJobIdToTask.containsKey(cronJobId)) {
+      return Optional.absent();
+    }
+
+    try {
+      return FluentIterable.from(scheduler.getTriggersOfJob(getQuartzJobKey(cronJobId)))
+          .filter(CronTrigger.class)
+          .transform(new Function<CronTrigger, String>() {
+            @Override public String apply(CronTrigger trigger) {
+              return trigger.getCronExpression();
+            }
+          })
+          .first();
+    } catch (SchedulerException e) {
+      LOG.log(Level.SEVERE, "Error reading job " + cronJobId + " from Quartz: " + e, e);
+      return Optional.absent();
+    }
+  }
+
+  @Override
+  public boolean isValidSchedule(@Nullable String schedule) {
+    return CronExpression.isValidExpression(schedule);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/676f2b15/src/main/java/org/apache/aurora/scheduler/cron/testing/AbstractCronIT.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/testing/AbstractCronIT.java b/src/main/java/org/apache/aurora/scheduler/cron/testing/AbstractCronIT.java
index 54b6fab..175400a 100644
--- a/src/main/java/org/apache/aurora/scheduler/cron/testing/AbstractCronIT.java
+++ b/src/main/java/org/apache/aurora/scheduler/cron/testing/AbstractCronIT.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Copyright 2013 Apache Software Foundation
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -15,6 +15,7 @@
  */
 package org.apache.aurora.scheduler.cron.testing;
 
+import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
 
 import com.twitter.common.testing.easymock.EasyMockTest;
@@ -23,7 +24,6 @@ import org.apache.aurora.scheduler.cron.CronPredictor;
 import org.apache.aurora.scheduler.cron.CronScheduler;
 import org.junit.Test;
 
-import static org.apache.aurora.gen.test.testConstants.VALID_CRON_SCHEDULES;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -36,14 +36,14 @@ public abstract class AbstractCronIT extends EasyMockTest {
   protected abstract CronScheduler makeCronScheduler() throws Exception;
 
   /**
-   * Child should configure expectations for a scheduler start.
+   * Child should return cron expressions that are expected to pass validation.
    */
-  protected abstract void expectStartCronScheduler();
+  protected abstract Collection<String> getValidCronSchedules();
 
   /**
-   * Child should configure expectations for a scheduler stop.
+   * Child should return a "wildcard" cron expression that executes at every possible moment.
    */
-  protected abstract void expectStopCronScheduler();
+  protected abstract String getWildcardCronSchedule();
 
   /**
    * Child should return an instance of the {@link CronPredictor} under test here.
@@ -54,20 +54,17 @@ public abstract class AbstractCronIT extends EasyMockTest {
   public void testCronSchedulerLifecycle() throws Exception {
     CronScheduler scheduler = makeCronScheduler();
 
-    expectStartCronScheduler();
-    expectStopCronScheduler();
-
     control.replay();
 
-    scheduler.start();
+    scheduler.startAsync().awaitRunning();
     final CountDownLatch cronRan = new CountDownLatch(1);
-    scheduler.schedule("* * * * *", new Runnable() {
+    scheduler.schedule(getWildcardCronSchedule(), new Runnable() {
       @Override public void run() {
         cronRan.countDown();
       }
     });
     cronRan.await();
-    scheduler.stop();
+    scheduler.stopAsync().awaitTerminated();
   }
 
   @Test
@@ -75,7 +72,7 @@ public abstract class AbstractCronIT extends EasyMockTest {
     control.replay();
 
     CronPredictor cronPredictor = makeCronPredictor();
-    for (String schedule : VALID_CRON_SCHEDULES) {
+    for (String schedule : getValidCronSchedules()) {
       cronPredictor.predictNextRun(schedule);
     }
   }
@@ -86,7 +83,7 @@ public abstract class AbstractCronIT extends EasyMockTest {
 
     control.replay();
 
-    for (String schedule : VALID_CRON_SCHEDULES) {
+    for (String schedule : getValidCronSchedules()) {
       assertTrue(String.format("Cron schedule %s should validate.", schedule),
           cron.isValidSchedule(schedule));
     }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/676f2b15/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java b/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
index 371addf..6c74348 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
@@ -40,7 +40,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.twitter.common.application.ShutdownRegistry;
 import com.twitter.common.args.Arg;
 import com.twitter.common.args.CmdLine;
-import com.twitter.common.base.ExceptionalCommand;
+import com.twitter.common.base.Command;
 import com.twitter.common.base.Supplier;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
@@ -165,10 +165,10 @@ public class CronJobManager extends JobManager implements EventSubscriber
{
    */
   @Subscribe
   public void schedulerActive(SchedulerActive schedulerActive) {
-    cron.start();
-    shutdownRegistry.addAction(new ExceptionalCommand<CronException>() {
-      @Override public void execute() throws CronException {
-        cron.stop();
+    cron.startAsync().awaitRunning();
+    shutdownRegistry.addAction(new Command() {
+      @Override public void execute() {
+        cron.stopAsync().awaitTerminated();
       }
     });
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/676f2b15/src/test/java/org/apache/aurora/scheduler/cron/noop/NoopCronIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/noop/NoopCronIT.java b/src/test/java/org/apache/aurora/scheduler/cron/noop/NoopCronIT.java
index 566fb47..b218437 100644
--- a/src/test/java/org/apache/aurora/scheduler/cron/noop/NoopCronIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/cron/noop/NoopCronIT.java
@@ -41,8 +41,8 @@ public class NoopCronIT {
 
   @Test
   public void testLifecycle() throws Exception {
-    cronScheduler.start();
-    cronScheduler.stop();
+    cronScheduler.startAsync().awaitRunning();
+    cronScheduler.stopAsync().awaitTerminated();
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/676f2b15/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzCronIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzCronIT.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzCronIT.java
new file mode 100644
index 0000000..c7b513d
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzCronIT.java
@@ -0,0 +1,63 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import java.util.Collection;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.scheduler.cron.CronPredictor;
+import org.apache.aurora.scheduler.cron.CronScheduler;
+import org.apache.aurora.scheduler.cron.testing.AbstractCronIT;
+
+/**
+ * Created by ksweeney on 2/4/14.
+ */
+public class QuartzCronIT extends AbstractCronIT {
+  private static final String WILDCARD_SCHEDULE = "* * * * * ?";
+
+  private Injector makeInjector() {
+    return Guice.createInjector(new QuartzCronModule(), new AbstractModule() {
+      @Override protected void configure() {
+        bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK);
+      }
+    });
+  }
+
+  @Override
+  protected CronScheduler makeCronScheduler() throws Exception {
+    return makeInjector().getInstance(CronScheduler.class);
+  }
+
+  @Override
+  protected Collection<String> getValidCronSchedules() {
+    return ImmutableSet.of(WILDCARD_SCHEDULE);
+  }
+
+  @Override
+  protected String getWildcardCronSchedule() {
+    return WILDCARD_SCHEDULE;
+  }
+
+  @Override
+  protected CronPredictor makeCronPredictor() throws Exception {
+    return makeInjector().getInstance(CronPredictor.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/676f2b15/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzCronPredictorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzCronPredictorTest.java
b/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzCronPredictorTest.java
new file mode 100644
index 0000000..d8d8867
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzCronPredictorTest.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import java.util.Date;
+
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.scheduler.cron.CronPredictor;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class QuartzCronPredictorTest {
+  private CronPredictor cronPredictor;
+
+  private FakeClock clock;
+
+  @Before
+  public void setUp() {
+    clock = new FakeClock();
+
+    cronPredictor = new QuartzCronPredictor(clock);
+  }
+
+  @Test
+  public void testValidSchedule() {
+    clock.advance(Amount.of(1L, Time.DAYS));
+    Date expectedPrediction = new Date(
+        Amount.of(1L, Time.DAYS).as(Time.MILLISECONDS)
+            + Amount.of(1L, Time.SECONDS).as(Time.MILLISECONDS));
+    assertEquals(expectedPrediction, cronPredictor.predictNextRun("* * * * * ?"));
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testInvalidSchedule() {
+    cronPredictor.predictNextRun("INVALID SCHEDULE");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/676f2b15/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzCronSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzCronSchedulerTest.java
b/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzCronSchedulerTest.java
new file mode 100644
index 0000000..7e56e55
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzCronSchedulerTest.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import com.google.common.collect.ImmutableList;
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import org.apache.aurora.scheduler.cron.CronException;
+import org.apache.aurora.scheduler.cron.CronScheduler;
+import org.easymock.Capture;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.spi.JobFactory;
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.isA;
+import static org.junit.Assert.assertEquals;
+
+public class QuartzCronSchedulerTest extends EasyMockTest {
+  private Scheduler scheduler;
+
+  private CronScheduler cronScheduler;
+
+  @Before
+  public void setUp() {
+    scheduler = createMock(Scheduler.class);
+
+    cronScheduler = new QuartzCronScheduler(scheduler);
+  }
+
+  @Test
+  public void testLifecycle() throws Exception {
+    scheduler.setJobFactory(isA(JobFactory.class));
+    scheduler.start();
+    scheduler.shutdown();
+
+    control.replay();
+
+    cronScheduler.startAsync().awaitRunning();
+    cronScheduler.stopAsync().awaitTerminated();
+  }
+
+  @Test(expected = CronException.class)
+  public void testScheduleFailed() throws Exception {
+    expect(scheduler.scheduleJob(isA(JobDetail.class), isA(Trigger.class)))
+        .andThrow(new SchedulerException("Test fail."));
+
+    control.replay();
+
+    cronScheduler.schedule("* * * * * ?", new Runnable() {
+      @Override public void run() {
+       // Not reached.
+      }
+   });
+  }
+
+  @Test
+  public void testJobLifecycle() throws Exception {
+    String schedule = "* */3 */2 * * ?";
+
+    Capture<JobDetail> scheduledJobDetail = createCapture();
+    final Capture<Trigger> cronTrigger = createCapture();
+    Capture<JobKey> descheduledJobKey = createCapture();
+    expect(scheduler.scheduleJob(capture(scheduledJobDetail), capture(cronTrigger)))
+        .andReturn(null);
+
+    expect(scheduler.getTriggersOfJob(isA(JobKey.class)));
+    expectLastCall().andAnswer(new IAnswer<Object>() {
+      @Override public Object answer() throws Throwable {
+        return ImmutableList.of(cronTrigger.getValue());
+      }
+    });
+    expect(scheduler.deleteJob(capture(descheduledJobKey))).andReturn(true);
+
+    control.replay();
+
+    String cronJobId = cronScheduler.schedule(schedule, new Runnable() {
+      @Override public void run() {
+        // Not reached.
+      }
+    });
+
+    assertEquals(schedule, cronScheduler.getSchedule(cronJobId).orNull());
+
+    cronScheduler.deschedule(cronJobId);
+
+    assertEquals(scheduledJobDetail.getValue().getKey(), descheduledJobKey.getValue());
+    assertEquals(null, cronScheduler.getSchedule(cronJobId).orNull());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/676f2b15/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java b/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java
index 684e239..86ef6d5 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java
@@ -140,7 +140,8 @@ public class CronJobManagerTest extends EasyMockTest {
 
   @Test
   public void testPubsubWiring() throws Exception {
-    cronScheduler.start();
+    expect(cronScheduler.startAsync()).andReturn(cronScheduler);
+    cronScheduler.awaitRunning();
     shutdownRegistry.addAction(EasyMock.<ExceptionalCommand<?>>anyObject());
     expect(storageUtil.jobStore.fetchJobs(MANAGER_KEY))
         .andReturn(ImmutableList.<IJobConfiguration>of());
@@ -351,7 +352,8 @@ public class CronJobManagerTest extends EasyMockTest {
   public void testInvalidStoredJob() throws Exception {
     // Invalid jobs are left alone, but doesn't halt operation.
 
-    cronScheduler.start();
+    expect(cronScheduler.startAsync()).andReturn(cronScheduler);
+    cronScheduler.awaitRunning();
     shutdownRegistry.addAction(EasyMock.<ExceptionalCommand<?>>anyObject());
 
     IJobConfiguration jobA =
@@ -371,7 +373,8 @@ public class CronJobManagerTest extends EasyMockTest {
   public void testJobStoredTwice() throws Exception {
     // Simulate an inconsistent storage that contains two cron jobs under the same key.
 
-    cronScheduler.start();
+    expect(cronScheduler.startAsync()).andReturn(cronScheduler);
+    cronScheduler.awaitRunning();
     shutdownRegistry.addAction(EasyMock.<ExceptionalCommand<?>>anyObject());
 
     IJobConfiguration jobA =


Mime
View raw message