aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kevi...@apache.org
Subject [4/5] Revert "AURORA-132: Cron system based on Quartz"
Date Thu, 24 Apr 2014 01:18:20 GMT
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5de2b1ab/src/main/java/org/apache/aurora/scheduler/http/ServletModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/ServletModule.java b/src/main/java/org/apache/aurora/scheduler/http/ServletModule.java
index 6d76f60..9831012 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/ServletModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/ServletModule.java
@@ -39,8 +39,8 @@ import com.twitter.common.net.pool.DynamicHostSet;
 import com.twitter.common.net.pool.DynamicHostSet.MonitorException;
 import com.twitter.thrift.ServiceInstance;
 
-import org.apache.aurora.scheduler.cron.CronJobManager;
 import org.apache.aurora.scheduler.quota.QuotaManager;
+import org.apache.aurora.scheduler.state.CronJobManager;
 import org.apache.aurora.scheduler.state.SchedulerCore;
 import org.apache.aurora.scheduler.storage.entities.IServerInfo;
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5de2b1ab/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/StructDump.java b/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
index 823668f..efea75f 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
@@ -25,6 +25,7 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.twitter.common.base.Closure;
 import com.twitter.common.thrift.Util;
@@ -33,7 +34,7 @@ import org.antlr.stringtemplate.StringTemplate;
 import org.apache.aurora.gen.JobConfiguration;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.cron.CronJobManager;
+import org.apache.aurora.scheduler.state.CronJobManager;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.Work;
@@ -42,8 +43,6 @@ import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.thrift.TBase;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
 /**
  * Servlet that prints out the raw configuration for a specified struct.
  */
@@ -51,13 +50,11 @@ import static com.google.common.base.Preconditions.checkNotNull;
 public class StructDump extends JerseyTemplateServlet {
 
   private final Storage storage;
-  private final CronJobManager cronJobManager;
 
   @Inject
-  public StructDump(Storage storage, CronJobManager cronJobManager) {
+  public StructDump(Storage storage) {
     super("structdump");
-    this.storage = checkNotNull(storage);
-    this.cronJobManager = checkNotNull(cronJobManager);
+    this.storage = Preconditions.checkNotNull(storage);
   }
 
   private static final String USAGE =
@@ -109,11 +106,11 @@ public class StructDump extends JerseyTemplateServlet {
       @PathParam("job") final String job) {
 
     final IJobKey jobKey = JobKeys.from(role, environment, job);
-    return dumpEntity("Cron job " + JobKeys.canonicalString(jobKey),
+    return dumpEntity("Cron job " + JobKeys.toPath(jobKey),
         new Work.Quiet<Optional<? extends TBase<?, ?>>>() {
           @Override
           public Optional<JobConfiguration> apply(StoreProvider storeProvider) {
-            return storeProvider.getJobStore().fetchJob(cronJobManager.getManagerKey(), jobKey)
+            return storeProvider.getJobStore().fetchJob(CronJobManager.MANAGER_KEY, jobKey)
                 .transform(IJobConfiguration.TO_BUILDER);
           }
         });

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5de2b1ab/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java b/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
new file mode 100644
index 0000000..4bd190c
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
@@ -0,0 +1,484 @@
+/**
+ * Copyright 2013 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.state;
+
+import java.util.Collections;
+import java.util.Date;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.twitter.common.application.ShutdownRegistry;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.base.Command;
+import com.twitter.common.base.Supplier;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.util.BackoffHelper;
+
+import org.apache.aurora.gen.CronCollisionPolicy;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.ScheduleException;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
+import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
+import org.apache.aurora.scheduler.cron.CronException;
+import org.apache.aurora.scheduler.cron.CronScheduler;
+import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.Storage.Work;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.commons.lang.StringUtils;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+
+/**
+ * A job scheduler that receives jobs that should be run periodically on a cron schedule.
+ */
+public class CronJobManager implements EventSubscriber {
+
+  public static final String MANAGER_KEY = "CRON";
+
+  @VisibleForTesting
+  static final Optional<String> KILL_AUDIT_MESSAGE = Optional.of("Killed by cron");
+
+  private static final Logger LOG = Logger.getLogger(CronJobManager.class.getName());
+
+  @CmdLine(name = "cron_start_initial_backoff", help =
+      "Initial backoff delay while waiting for a previous cron run to start.")
+  private static final Arg<Amount<Long, Time>> CRON_START_INITIAL_BACKOFF =
+      Arg.create(Amount.of(1L, Time.SECONDS));
+
+  @CmdLine(name = "cron_start_max_backoff", help =
+      "Max backoff delay while waiting for a previous cron run to start.")
+  private static final Arg<Amount<Long, Time>> CRON_START_MAX_BACKOFF =
+      Arg.create(Amount.of(1L, Time.MINUTES));
+
+  private final AtomicLong cronJobsTriggered = Stats.exportLong("cron_jobs_triggered");
+  private final AtomicLong cronJobLaunchFailures = Stats.exportLong("cron_job_launch_failures");
+
+  // Maps from the unique job identifier to the unique identifier used internally by the
cron
+  // scheduler.
+  private final Map<IJobKey, String> scheduledJobs =
+      Collections.synchronizedMap(Maps.<IJobKey, String>newHashMap());
+
+  // Prevents runs from dogpiling while waiting for a run to transition out of the KILLING
state.
+  // This is necessary because killing a job (if dictated by cron collision policy) is an
+  // asynchronous operation.
+  private final Map<IJobKey, SanitizedConfiguration> pendingRuns =
+      Collections.synchronizedMap(Maps.<IJobKey, SanitizedConfiguration>newHashMap());
+
+  private final StateManager stateManager;
+  private final Storage storage;
+  private final CronScheduler cron;
+  private final ShutdownRegistry shutdownRegistry;
+  private final BackoffHelper delayedStartBackoff;
+  private final Executor delayedRunExecutor;
+
+  @Inject
+  CronJobManager(
+      StateManager stateManager,
+      Storage storage,
+      CronScheduler cron,
+      ShutdownRegistry shutdownRegistry) {
+
+    this(
+        stateManager,
+        storage,
+        cron,
+        shutdownRegistry,
+        Executors.newCachedThreadPool(
+            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("CronDelay-%d").build()));
+  }
+
+  @VisibleForTesting
+  CronJobManager(
+      StateManager stateManager,
+      Storage storage,
+      CronScheduler cron,
+      ShutdownRegistry shutdownRegistry,
+      Executor delayedRunExecutor) {
+
+    this.stateManager = checkNotNull(stateManager);
+    this.storage = checkNotNull(storage);
+    this.cron = checkNotNull(cron);
+    this.shutdownRegistry = checkNotNull(shutdownRegistry);
+    this.delayedStartBackoff =
+        new BackoffHelper(CRON_START_INITIAL_BACKOFF.get(), CRON_START_MAX_BACKOFF.get());
+    this.delayedRunExecutor = checkNotNull(delayedRunExecutor);
+
+    Stats.exportSize("cron_num_pending_runs", pendingRuns);
+  }
+
+  private void mapScheduledJob(SanitizedCronJob cronJob) throws ScheduleException {
+    IJobKey jobKey = cronJob.config.getJobConfig().getKey();
+    synchronized (scheduledJobs) {
+      Preconditions.checkState(
+          !scheduledJobs.containsKey(jobKey),
+          "Illegal state - cron schedule already exists for " + JobKeys.toPath(jobKey));
+      scheduledJobs.put(jobKey, scheduleJob(cronJob));
+    }
+  }
+
+  /**
+   * Notifies the cron job manager that the scheduler is active, and job configurations are
ready to
+   * load.
+   *
+   * @param schedulerActive Event.
+   */
+  @Subscribe
+  public void schedulerActive(SchedulerActive schedulerActive) {
+    cron.startAsync().awaitRunning();
+    shutdownRegistry.addAction(new Command() {
+      @Override
+      public void execute() {
+        // NOTE: We don't know ahead-of-time which thread will execute the shutdown command,
+        // so we shouldn't block here.
+        cron.stopAsync();
+      }
+    });
+
+    Iterable<IJobConfiguration> crons =
+        storage.consistentRead(new Work.Quiet<Iterable<IJobConfiguration>>()
{
+          @Override
+          public Iterable<IJobConfiguration> apply(Storage.StoreProvider storeProvider)
{
+            return storeProvider.getJobStore().fetchJobs(MANAGER_KEY);
+          }
+        });
+
+    for (IJobConfiguration job : crons) {
+      try {
+        mapScheduledJob(new SanitizedCronJob(job, cron));
+      } catch (ScheduleException | TaskDescriptionException e) {
+        logLaunchFailure(job, e);
+      }
+    }
+  }
+
+  private void logLaunchFailure(IJobConfiguration job, Exception e) {
+    cronJobLaunchFailures.incrementAndGet();
+    LOG.log(Level.SEVERE, "Scheduling failed for recovered job " + job, e);
+  }
+
+  /**
+   * Triggers execution of a job.
+   *
+   * @param jobKey Key of the job to start.
+   * @throws ScheduleException If the job could not be started with the cron system.
+   * @throws TaskDescriptionException If the stored job associated with {@code jobKey} has
field
+   *         validation problems.
+   */
+  public void startJobNow(IJobKey jobKey) throws TaskDescriptionException, ScheduleException
{
+    Optional<IJobConfiguration> jobConfig = fetchJob(jobKey);
+    if (!jobConfig.isPresent()) {
+      throw new ScheduleException("Cron job does not exist for " + JobKeys.toPath(jobKey));
+    }
+
+    cronTriggered(new SanitizedCronJob(jobConfig.get(), cron));
+  }
+
+  private void delayedRun(final Query.Builder query, final SanitizedConfiguration config)
{
+    IJobConfiguration job = config.getJobConfig();
+    final String jobPath = JobKeys.toPath(job);
+    final IJobKey jobKey = job.getKey();
+    LOG.info("Waiting for job to terminate before launching cron job " + jobPath);
+    if (pendingRuns.put(jobKey, config) == null) {
+      LOG.info("Launching a task to wait for job to finish: " + jobPath);
+      // There was no run already pending for this job, launch a task to delay launch until
the
+      // existing run has terminated.
+      delayedRunExecutor.execute(new Runnable() {
+        @Override
+        public void run() {
+          runWhenTerminated(query, jobKey);
+        }
+      });
+    }
+  }
+
+  private void runWhenTerminated(final Query.Builder query, final IJobKey jobKey) {
+    try {
+      delayedStartBackoff.doUntilSuccess(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          if (Storage.Util.consistentFetchTasks(storage, query).isEmpty()) {
+            LOG.info("Initiating delayed launch of cron " + jobKey);
+            SanitizedConfiguration config = pendingRuns.remove(jobKey);
+            checkNotNull(config, "Failed to fetch job for delayed run of " + jobKey);
+            LOG.info("Launching " + config.getTaskConfigs().size() + " tasks.");
+            stateManager.insertPendingTasks(config.getTaskConfigs());
+            return true;
+          } else {
+            LOG.info("Not yet safe to run cron " + jobKey);
+            return false;
+          }
+        }
+      });
+    } catch (InterruptedException e) {
+      LOG.log(Level.WARNING, "Interrupted while trying to launch cron " + jobKey, e);
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private void killActiveTasks(Set<String> taskIds) {
+    if (taskIds.isEmpty()) {
+      return;
+    }
+
+    for (String taskId : taskIds) {
+      stateManager.changeState(
+          taskId,
+          Optional.<ScheduleStatus>absent(),
+          KILLING,
+          KILL_AUDIT_MESSAGE);
+    }
+  }
+
+  public static CronCollisionPolicy orDefault(@Nullable CronCollisionPolicy policy) {
+    return Optional.fromNullable(policy).or(CronCollisionPolicy.KILL_EXISTING);
+  }
+
+  /**
+   * Triggers execution of a cron job, depending on the cron collision policy for the job.
+   *
+   * @param cronJob The job to be triggered.
+   */
+  private void cronTriggered(SanitizedCronJob cronJob) {
+    final SanitizedConfiguration config = cronJob.config;
+    final IJobConfiguration job = config.getJobConfig();
+    LOG.info(String.format("Cron triggered for %s at %s with policy %s",
+        JobKeys.toPath(job), new Date(), job.getCronCollisionPolicy()));
+    cronJobsTriggered.incrementAndGet();
+
+    storage.write(new MutateWork.NoResult.Quiet() {
+      @Override protected void execute(Storage.MutableStoreProvider storeProvider) {
+        ImmutableMap.Builder<Integer, ITaskConfig> builder = ImmutableMap.builder();
+        Query.Builder activeQuery = Query.jobScoped(job.getKey()).active();
+        Set<String> activeTasks = Tasks.ids(storeProvider.getTaskStore().fetchTasks(activeQuery));
+
+        if (activeTasks.isEmpty()) {
+          builder.putAll(config.getTaskConfigs());
+        } else {
+          switch (orDefault(job.getCronCollisionPolicy())) {
+            case KILL_EXISTING:
+              killActiveTasks(activeTasks);
+              delayedRun(activeQuery, config);
+              break;
+
+            case CANCEL_NEW:
+              break;
+
+            case RUN_OVERLAP:
+              LOG.severe(String.format(
+                  "Ignoring trigger for job %s with deprecated collision"
+                  + "policy RUN_OVERLAP due to unterminated active tasks.",
+                  JobKeys.toPath(job)));
+              break;
+
+            default:
+              LOG.severe("Unrecognized cron collision policy: " + job.getCronCollisionPolicy());
+          }
+        }
+
+        Map<Integer, ITaskConfig> newTasks = builder.build();
+        if (!newTasks.isEmpty()) {
+          stateManager.insertPendingTasks(newTasks);
+        }
+      }
+    });
+  }
+
+  /**
+   * Updates (re-schedules) the existing cron job.
+   *
+   * @param config New job configuration to update to.
+   * @throws ScheduleException If non-cron job confuration provided.
+   */
+  public void updateJob(SanitizedConfiguration config) throws ScheduleException {
+    IJobConfiguration job = config.getJobConfig();
+    if (!hasCronSchedule(job)) {
+      throw new ScheduleException("A cron job may not be updated to a non-cron job.");
+    }
+    String key = scheduledJobs.remove(job.getKey());
+    if (key == null) {
+      throw new ScheduleException(
+          "No cron template found for the given key: " + JobKeys.toPath(job));
+    }
+    cron.deschedule(key);
+    checkArgument(receiveJob(config));
+  }
+
+  private static boolean hasCronSchedule(IJobConfiguration job) {
+    checkNotNull(job);
+    return !StringUtils.isEmpty(job.getCronSchedule());
+  }
+
+  public boolean receiveJob(SanitizedConfiguration config) throws ScheduleException {
+    final IJobConfiguration job = config.getJobConfig();
+    if (!hasCronSchedule(job)) {
+      return false;
+    }
+
+    if (CronCollisionPolicy.RUN_OVERLAP.equals(job.getCronCollisionPolicy())) {
+      throw new ScheduleException(
+          "The RUN_OVERLAP collision policy has been removed (AURORA-38).");
+    }
+
+    SanitizedCronJob cronJob = new SanitizedCronJob(config, cron);
+    storage.write(new MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(Storage.MutableStoreProvider storeProvider) {
+        storeProvider.getJobStore().saveAcceptedJob(MANAGER_KEY, job);
+      }
+    });
+    mapScheduledJob(cronJob);
+
+    return true;
+  }
+
+  private String scheduleJob(final SanitizedCronJob cronJob) throws ScheduleException {
+    IJobConfiguration job = cronJob.config.getJobConfig();
+    final String jobPath = JobKeys.toPath(job);
+    LOG.info(String.format("Scheduling cron job %s: %s", jobPath, job.getCronSchedule()));
+    try {
+      return cron.schedule(job.getCronSchedule(), new Runnable() {
+        @Override
+        public void run() {
+          // TODO(William Farner): May want to record information about job runs.
+          LOG.info("Running cron job: " + jobPath);
+          cronTriggered(cronJob);
+        }
+      });
+    } catch (CronException e) {
+      throw new ScheduleException("Failed to schedule cron job: " + e.getMessage(), e);
+    }
+  }
+
+  public Iterable<IJobConfiguration> getJobs() {
+    return storage.consistentRead(new Work.Quiet<Iterable<IJobConfiguration>>()
{
+      @Override
+      public Iterable<IJobConfiguration> apply(Storage.StoreProvider storeProvider)
{
+        return storeProvider.getJobStore().fetchJobs(MANAGER_KEY);
+      }
+    });
+  }
+
+  public boolean hasJob(IJobKey jobKey) {
+    return fetchJob(jobKey).isPresent();
+  }
+
+  private Optional<IJobConfiguration> fetchJob(final IJobKey jobKey) {
+    checkNotNull(jobKey);
+    return storage.consistentRead(new Work.Quiet<Optional<IJobConfiguration>>()
{
+      @Override
+      public Optional<IJobConfiguration> apply(Storage.StoreProvider storeProvider)
{
+        return storeProvider.getJobStore().fetchJob(MANAGER_KEY, jobKey);
+      }
+    });
+  }
+
+  public boolean deleteJob(final IJobKey jobKey) {
+    Optional<IJobConfiguration> job = fetchJob(jobKey);
+    if (!job.isPresent()) {
+      return false;
+    }
+
+    String scheduledJobKey = scheduledJobs.remove(jobKey);
+    if (scheduledJobKey != null) {
+      cron.deschedule(scheduledJobKey);
+      storage.write(new MutateWork.NoResult.Quiet() {
+        @Override
+        protected void execute(Storage.MutableStoreProvider storeProvider) {
+          storeProvider.getJobStore().removeJob(jobKey);
+        }
+      });
+      LOG.info("Successfully deleted cron job " + jobKey);
+    }
+    return true;
+  }
+
+  private final Function<String, String> keyToSchedule = new Function<String, String>()
{
+    @Override
+    public String apply(String key) {
+      return cron.getSchedule(key).or("Not found.");
+    }
+  };
+
+  public Map<IJobKey, String> getScheduledJobs() {
+    synchronized (scheduledJobs) {
+      return ImmutableMap.copyOf(Maps.transformValues(scheduledJobs, keyToSchedule));
+    }
+  }
+
+  public Set<IJobKey> getPendingRuns() {
+    synchronized (pendingRuns) {
+      return ImmutableSet.copyOf(pendingRuns.keySet());
+    }
+  }
+
+  /**
+   * Used by functions that expect field validation before being called.
+   */
+  private static class SanitizedCronJob {
+    private final SanitizedConfiguration config;
+
+    SanitizedCronJob(IJobConfiguration unsanitized, CronScheduler cron)
+        throws ScheduleException, TaskDescriptionException {
+
+      this(SanitizedConfiguration.fromUnsanitized(unsanitized), cron);
+    }
+
+    SanitizedCronJob(SanitizedConfiguration config, CronScheduler cron) throws ScheduleException
{
+      final IJobConfiguration job = config.getJobConfig();
+      if (!hasCronSchedule(job)) {
+        throw new ScheduleException(
+            String.format("Not a valid cronjob, %s has no cron schedule", JobKeys.toPath(job)));
+      }
+
+      if (!cron.isValidSchedule(job.getCronSchedule())) {
+        throw new ScheduleException("Invalid cron schedule: " + job.getCronSchedule());
+      }
+
+      this.config = config;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5de2b1ab/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
index e3b5b04..5696485 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
@@ -125,7 +125,7 @@ class LockManagerImpl implements LockManager {
   private static String formatLockKey(ILockKey lockKey) {
     switch (lockKey.getSetField()) {
       case JOB:
-        return JobKeys.canonicalString(lockKey.getJob());
+        return JobKeys.toPath(lockKey.getJob());
       default:
         return "Unknown lock key type: " + lockKey.getSetField();
     }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5de2b1ab/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
index 09b7fbd..5369279 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
@@ -40,9 +40,6 @@ import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.ScheduleException;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
-import org.apache.aurora.scheduler.cron.CronException;
-import org.apache.aurora.scheduler.cron.CronJobManager;
-import org.apache.aurora.scheduler.cron.SanitizedCronJob;
 import org.apache.aurora.scheduler.quota.QuotaCheckResult;
 import org.apache.aurora.scheduler.quota.QuotaManager;
 import org.apache.aurora.scheduler.storage.Storage;
@@ -74,7 +71,7 @@ class SchedulerCoreImpl implements SchedulerCore {
 
   // TODO(wfarner): Consider changing this class to not be concerned with cron jobs, requiring
the
   // caller to deal with the fork.
-  private final CronJobManager cronJobManager;
+  private final CronJobManager cronScheduler;
 
   // State manager handles persistence of task modifications and state transitions.
   private final StateManager stateManager;
@@ -86,7 +83,7 @@ class SchedulerCoreImpl implements SchedulerCore {
    * Creates a new core scheduler.
    *
    * @param storage Backing store implementation.
-   * @param cronJobManager Cron scheduler.
+   * @param cronScheduler Cron scheduler.
    * @param stateManager Persistent state manager.
    * @param taskIdGenerator Task ID generator.
    * @param quotaManager Quota manager.
@@ -94,13 +91,13 @@ class SchedulerCoreImpl implements SchedulerCore {
   @Inject
   public SchedulerCoreImpl(
       Storage storage,
-      CronJobManager cronJobManager,
+      CronJobManager cronScheduler,
       StateManager stateManager,
       TaskIdGenerator taskIdGenerator,
       QuotaManager quotaManager) {
 
     this.storage = checkNotNull(storage);
-    this.cronJobManager = cronJobManager;
+    this.cronScheduler = cronScheduler;
     this.stateManager = checkNotNull(stateManager);
     this.taskIdGenerator = checkNotNull(taskIdGenerator);
     this.quotaManager = checkNotNull(quotaManager);
@@ -111,11 +108,7 @@ class SchedulerCoreImpl implements SchedulerCore {
         storage,
         Query.jobScoped(job.getKey()).active()).isEmpty();
 
-    return hasActiveTasks || cronJobManager.hasJob(job.getKey());
-  }
-
-  private static boolean isCron(SanitizedConfiguration config) {
-    return config.getJobConfig().isSetCronSchedule();
+    return hasActiveTasks || cronScheduler.hasJob(job.getKey());
   }
 
   @Override
@@ -127,19 +120,12 @@ class SchedulerCoreImpl implements SchedulerCore {
       protected void execute(MutableStoreProvider storeProvider) throws ScheduleException
{
         final IJobConfiguration job = sanitizedConfiguration.getJobConfig();
         if (hasActiveJob(job)) {
-          throw new ScheduleException(
-              "Job already exists: " + JobKeys.canonicalString(job.getKey()));
+          throw new ScheduleException("Job already exists: " + JobKeys.toPath(job));
         }
 
         validateTaskLimits(job.getTaskConfig(), job.getInstanceCount());
 
-        if (isCron(sanitizedConfiguration)) {
-          try {
-            cronJobManager.createJob(SanitizedCronJob.from(sanitizedConfiguration));
-          } catch (CronException e) {
-            throw new ScheduleException(e);
-          }
-        } else {
+        if (!cronScheduler.receiveJob(sanitizedConfiguration)) {
           LOG.info("Launching " + sanitizedConfiguration.getTaskConfigs().size() + " tasks.");
           stateManager.insertPendingTasks(sanitizedConfiguration.getTaskConfigs());
         }
@@ -233,7 +219,7 @@ class SchedulerCoreImpl implements SchedulerCore {
       // it.
       // TODO(maxim): Should be trivial to support killing multiple jobs instead.
       IJobKey jobKey = Iterables.getOnlyElement(JobKeys.from(query).get());
-      cronDeleted = cronJobManager.deleteJob(jobKey);
+      cronDeleted = cronScheduler.deleteJob(jobKey);
     }
 
     // Unless statuses were specifically supplied, only attempt to kill active tasks.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5de2b1ab/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
index 9db2a1a..7d26082 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
@@ -49,10 +49,17 @@ public class StateModule extends AbstractModule {
     bind(LockManager.class).to(LockManagerImpl.class);
     bind(LockManagerImpl.class).in(Singleton.class);
 
+    bindCronJobManager(binder());
     bindMaintenanceController(binder());
   }
 
   @VisibleForTesting
+  static void bindCronJobManager(Binder binder) {
+    binder.bind(CronJobManager.class).in(Singleton.class);
+    PubsubEventModule.bindSubscriber(binder, CronJobManager.class);
+  }
+
+  @VisibleForTesting
   static void bindMaintenanceController(Binder binder) {
     binder.bind(MaintenanceController.class).to(MaintenanceControllerImpl.class);
     binder.bind(MaintenanceControllerImpl.class).in(Singleton.class);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5de2b1ab/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index 4e2dbeb..84151a5 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -100,14 +100,11 @@ import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
 import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
-import org.apache.aurora.scheduler.cron.CronException;
-import org.apache.aurora.scheduler.cron.CronJobManager;
 import org.apache.aurora.scheduler.cron.CronPredictor;
-import org.apache.aurora.scheduler.cron.CrontabEntry;
-import org.apache.aurora.scheduler.cron.SanitizedCronJob;
 import org.apache.aurora.scheduler.quota.QuotaInfo;
 import org.apache.aurora.scheduler.quota.QuotaManager;
 import org.apache.aurora.scheduler.quota.QuotaManager.QuotaException;
+import org.apache.aurora.scheduler.state.CronJobManager;
 import org.apache.aurora.scheduler.state.LockManager;
 import org.apache.aurora.scheduler.state.LockManager.LockException;
 import org.apache.aurora.scheduler.state.MaintenanceController;
@@ -269,7 +266,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       schedulerCore.createJob(sanitized);
       response.setResponseCode(OK)
           .setMessage(String.format("%d new tasks pending for job %s",
-              sanitized.getJobConfig().getInstanceCount(), JobKeys.canonicalString(job.getKey())));
+              sanitized.getJobConfig().getInstanceCount(), JobKeys.toPath(job)));
     } catch (LockException e) {
       response.setResponseCode(LOCK_ERROR).setMessage(e.getMessage());
     } catch (TaskDescriptionException | ScheduleException e) {
@@ -302,11 +299,11 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
           ILockKey.build(LockKey.job(jobKey.newBuilder())),
           Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
 
-      cronJobManager.updateJob(SanitizedCronJob.fromUnsanitized(job));
+      cronJobManager.updateJob(SanitizedConfiguration.fromUnsanitized(job));
       return response.setResponseCode(OK).setMessage("Replaced template for: " + jobKey);
     } catch (LockException e) {
       return response.setResponseCode(LOCK_ERROR).setMessage(e.getMessage());
-    } catch (CronException | TaskDescriptionException e) {
+    } catch (TaskDescriptionException | ScheduleException e) {
       return response.setResponseCode(INVALID_REQUEST).setMessage(e.getMessage());
     }
   }
@@ -341,16 +338,21 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     try {
       sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
     } catch (AuthFailedException e) {
-      return response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
+      response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
+      return response;
     }
 
     try {
       cronJobManager.startJobNow(jobKey);
-      return response.setResponseCode(OK).setMessage("Cron run started.");
-    } catch (CronException e) {
-      return response.setResponseCode(INVALID_REQUEST)
+      response.setResponseCode(OK).setMessage("Cron run started.");
+    } catch (ScheduleException e) {
+      response.setResponseCode(INVALID_REQUEST)
           .setMessage("Failed to start cron job - " + e.getMessage());
+    } catch (TaskDescriptionException e) {
+      response.setResponseCode(ERROR).setMessage("Invalid task description: " + e.getMessage());
     }
+
+    return response;
   }
 
   // TODO(William Farner): Provide status information about cron jobs here.
@@ -408,14 +410,13 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       @Override
       public JobSummary apply(IJobKey jobKey) {
         IJobConfiguration job = jobs.get(jobKey);
-        JobSummary summary = new JobSummary()
+        JobSummary smry = new JobSummary()
             .setJob(job.newBuilder())
             .setStats(Jobs.getJobStats(tasks.get(jobKey)).newBuilder());
 
         return Strings.isNullOrEmpty(job.getCronSchedule())
-            ? summary
-            : summary.setNextCronRunMs(
-                cronPredictor.predictNextRun(CrontabEntry.parse(job.getCronSchedule())).getTime());
+            ? smry
+            : smry.setNextCronRunMs(cronPredictor.predictNextRun(job.getCronSchedule()).getTime());
       }
     };
 
@@ -869,8 +870,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
               jobsByKey(jobStore, existingJob.getKey());
           switch (matches.size()) {
             case 0:
-              error = Optional.of(
-                  "No jobs found for key " + JobKeys.canonicalString(existingJob.getKey()));
+              error = Optional.of("No jobs found for key " + JobKeys.toPath(existingJob));
               break;
 
             case 1:
@@ -878,16 +878,14 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
                   Iterables.getOnlyElement(matches.entries());
               IJobConfiguration storedJob = match.getValue();
               if (!storedJob.equals(existingJob)) {
-                error = Optional.of(
-                    "CAS compare failed for " + JobKeys.canonicalString(storedJob.getKey()));
+                error = Optional.of("CAS compare failed for " + JobKeys.toPath(storedJob));
               } else {
                 jobStore.saveAcceptedJob(match.getKey(), rewrittenJob);
               }
               break;
 
             default:
-              error = Optional.of("Multiple jobs found for key "
-                  + JobKeys.canonicalString(existingJob.getKey()));
+              error = Optional.of("Multiple jobs found for key " + JobKeys.toPath(existingJob));
           }
         }
         break;


Mime
View raw message