aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject git commit: Remove the failed JobManager abstraction.
Date Mon, 31 Mar 2014 18:40:23 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 20a14b90a -> 897659298


Remove the failed JobManager abstraction.

This removes a circular dependency between CronJobManager and SchedulerCoreImpl.

Reviewed at https://reviews.apache.org/r/19808/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/89765929
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/89765929
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/89765929

Branch: refs/heads/master
Commit: 897659298c2a7500a1a05944e0a51618c8af4b23
Parents: 20a14b9
Author: Bill Farner <wfarner@apache.org>
Authored: Mon Mar 31 11:24:39 2014 -0700
Committer: Bill Farner <wfarner@apache.org>
Committed: Mon Mar 31 11:24:39 2014 -0700

----------------------------------------------------------------------
 .../aurora/scheduler/state/CronJobManager.java  | 127 +++++++++----------
 .../scheduler/state/ImmediateJobManager.java    |  61 ---------
 .../aurora/scheduler/state/JobManager.java      |  85 -------------
 .../aurora/scheduler/state/SchedulerCore.java   |   9 --
 .../scheduler/state/SchedulerCoreImpl.java      |  80 +++---------
 .../aurora/scheduler/state/StateModule.java     |   2 -
 .../thrift/SchedulerThriftInterface.java        |  11 +-
 .../state/BaseSchedulerCoreImplTest.java        |  32 ++---
 .../scheduler/state/CronJobManagerTest.java     |  46 +++----
 .../thrift/SchedulerThriftInterfaceTest.java    |   5 +-
 10 files changed, 119 insertions(+), 339 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/89765929/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java b/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
index 70ccf03..fa39e2b 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
@@ -48,9 +48,11 @@ import com.twitter.common.stats.Stats;
 import com.twitter.common.util.BackoffHelper;
 
 import org.apache.aurora.gen.CronCollisionPolicy;
+import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.ScheduleException;
+import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
 import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
 import org.apache.aurora.scheduler.cron.CronException;
@@ -62,22 +64,23 @@ import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.Storage.Work;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.commons.lang.StringUtils;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+
 /**
  * A job scheduler that receives jobs that should be run periodically on a cron schedule.
  */
-public class CronJobManager extends JobManager implements EventSubscriber {
+public class CronJobManager implements EventSubscriber {
 
   public static final String MANAGER_KEY = "CRON";
 
   @VisibleForTesting
-  static final String CRON_USER = "cron";
+  static final Optional<String> KILL_AUDIT_MESSAGE = Optional.of("Killed by cron");
 
   private static final Logger LOG = Logger.getLogger(CronJobManager.class.getName());
 
@@ -167,7 +170,8 @@ public class CronJobManager extends JobManager implements EventSubscriber
{
   public void schedulerActive(SchedulerActive schedulerActive) {
     cron.startAsync().awaitRunning();
     shutdownRegistry.addAction(new Command() {
-      @Override public void execute() {
+      @Override
+      public void execute() {
         cron.stopAsync().awaitTerminated();
       }
     });
@@ -203,10 +207,10 @@ public class CronJobManager extends JobManager implements EventSubscriber
{
    *         validation problems.
    */
   public void startJobNow(IJobKey jobKey) throws TaskDescriptionException, ScheduleException
{
-    checkNotNull(jobKey);
-
     Optional<IJobConfiguration> jobConfig = fetchJob(jobKey);
-    checkArgument(jobConfig.isPresent(), "No such cron job " + JobKeys.toPath(jobKey));
+    if (!jobConfig.isPresent()) {
+      throw new ScheduleException("Cron job does not exist for " + JobKeys.toPath(jobKey));
+    }
 
     cronTriggered(new SanitizedCronJob(jobConfig.get(), cron));
   }
@@ -234,7 +238,7 @@ public class CronJobManager extends JobManager implements EventSubscriber
{
       delayedStartBackoff.doUntilSuccess(new Supplier<Boolean>() {
         @Override
         public Boolean get() {
-          if (!hasTasks(query)) {
+          if (Storage.Util.consistentFetchTasks(storage, query).isEmpty()) {
             LOG.info("Initiating delayed launch of cron " + jobKey);
             SanitizedConfiguration config = pendingRuns.remove(jobKey);
             checkNotNull(config, "Failed to fetch job for delayed run of " + jobKey);
@@ -253,8 +257,18 @@ public class CronJobManager extends JobManager implements EventSubscriber
{
     }
   }
 
-  private boolean hasTasks(Query.Builder query) {
-    return !Storage.Util.consistentFetchTasks(storage, query).isEmpty();
+  private void killActiveTasks(Set<String> taskIds) {
+    if (taskIds.isEmpty()) {
+      return;
+    }
+
+    for (String taskId : taskIds) {
+      stateManager.changeState(
+          taskId,
+          Optional.<ScheduleStatus>absent(),
+          KILLING,
+          KILL_AUDIT_MESSAGE);
+    }
   }
 
   public static CronCollisionPolicy orDefault(@Nullable CronCollisionPolicy policy) {
@@ -267,56 +281,48 @@ public class CronJobManager extends JobManager implements EventSubscriber
{
    * @param cronJob The job to be triggered.
    */
   private void cronTriggered(SanitizedCronJob cronJob) {
-    SanitizedConfiguration config = cronJob.config;
-    IJobConfiguration job = config.getJobConfig();
+    final SanitizedConfiguration config = cronJob.config;
+    final IJobConfiguration job = config.getJobConfig();
     LOG.info(String.format("Cron triggered for %s at %s with policy %s",
         JobKeys.toPath(job), new Date(), job.getCronCollisionPolicy()));
     cronJobsTriggered.incrementAndGet();
 
-    ImmutableMap.Builder<Integer, ITaskConfig> builder = ImmutableMap.builder();
-    final Query.Builder activeQuery = Query.jobScoped(job.getKey()).active();
-    Set<IScheduledTask> activeTasks = Storage.Util.consistentFetchTasks(storage, activeQuery);
-
-    if (activeTasks.isEmpty()) {
-      builder.putAll(config.getTaskConfigs());
-    } else {
-      // Assign a default collision policy.
-      CronCollisionPolicy collisionPolicy = orDefault(job.getCronCollisionPolicy());
-
-      switch (collisionPolicy) {
-        case KILL_EXISTING:
-          try {
-            schedulerCore.killTasks(activeQuery, CRON_USER);
-            // Check immediately if the tasks are gone.  This could happen if the existing
tasks
-            // were pending.
-            if (!hasTasks(activeQuery)) {
-              builder.putAll(config.getTaskConfigs());
-            } else {
+    storage.write(new MutateWork.NoResult.Quiet() {
+      @Override protected void execute(Storage.MutableStoreProvider storeProvider) {
+        ImmutableMap.Builder<Integer, ITaskConfig> builder = ImmutableMap.builder();
+        Query.Builder activeQuery = Query.jobScoped(job.getKey()).active();
+        Set<String> activeTasks = Tasks.ids(storeProvider.getTaskStore().fetchTasks(activeQuery));
+
+        if (activeTasks.isEmpty()) {
+          builder.putAll(config.getTaskConfigs());
+        } else {
+          switch (orDefault(job.getCronCollisionPolicy())) {
+            case KILL_EXISTING:
+              killActiveTasks(activeTasks);
               delayedRun(activeQuery, config);
-            }
-          } catch (ScheduleException e) {
-            LOG.log(Level.SEVERE, "Failed to kill job.", e);
-          }
-          break;
+              break;
 
-        case CANCEL_NEW:
-          break;
+            case CANCEL_NEW:
+              break;
 
-        case RUN_OVERLAP:
-          LOG.severe("Ignoring trigger for job "
-              + JobKeys.toPath(job)
-              + " with deprecated collision policy RUN_OVERLAP due to unterminated active
tasks.");
-          break;
+            case RUN_OVERLAP:
+              LOG.severe(String.format(
+                  "Ignoring trigger for job %s with deprecated collision"
+                  + "policy RUN_OVERLAP due to unterminated active tasks.",
+                  JobKeys.toPath(job)));
+              break;
 
-        default:
-          LOG.severe("Unrecognized cron collision policy: " + job.getCronCollisionPolicy());
-      }
-    }
+            default:
+              LOG.severe("Unrecognized cron collision policy: " + job.getCronCollisionPolicy());
+          }
+        }
 
-    Map<Integer, ITaskConfig> newTasks = builder.build();
-    if (!newTasks.isEmpty()) {
-      stateManager.insertPendingTasks(newTasks);
-    }
+        Map<Integer, ITaskConfig> newTasks = builder.build();
+        if (!newTasks.isEmpty()) {
+          stateManager.insertPendingTasks(newTasks);
+        }
+      }
+    });
   }
 
   /**
@@ -331,22 +337,19 @@ public class CronJobManager extends JobManager implements EventSubscriber
{
       throw new ScheduleException("A cron job may not be updated to a non-cron job.");
     }
     String key = scheduledJobs.remove(job.getKey());
-    checkNotNull(key, "Attempted to update unknown job " + JobKeys.toPath(job));
+    if (key == null) {
+      throw new ScheduleException(
+          "No cron template found for the given key: " + JobKeys.toPath(job));
+    }
     cron.deschedule(key);
     checkArgument(receiveJob(config));
   }
 
-  @Override
-  public String getUniqueKey() {
-    return MANAGER_KEY;
-  }
-
   private static boolean hasCronSchedule(IJobConfiguration job) {
     checkNotNull(job);
     return !StringUtils.isEmpty(job.getCronSchedule());
   }
 
-  @Override
   public boolean receiveJob(SanitizedConfiguration config) throws ScheduleException {
     final IJobConfiguration job = config.getJobConfig();
     if (!hasCronSchedule(job)) {
@@ -388,7 +391,6 @@ public class CronJobManager extends JobManager implements EventSubscriber
{
     }
   }
 
-  @Override
   public Iterable<IJobConfiguration> getJobs() {
     return storage.consistentRead(new Work.Quiet<Iterable<IJobConfiguration>>()
{
       @Override
@@ -398,7 +400,6 @@ public class CronJobManager extends JobManager implements EventSubscriber
{
     });
   }
 
-  @Override
   public boolean hasJob(IJobKey jobKey) {
     return fetchJob(jobKey).isPresent();
   }
@@ -413,11 +414,9 @@ public class CronJobManager extends JobManager implements EventSubscriber
{
     });
   }
 
-  @Override
   public boolean deleteJob(final IJobKey jobKey) {
-    checkNotNull(jobKey);
-
-    if (!hasJob(jobKey)) {
+    Optional<IJobConfiguration> job = fetchJob(jobKey);
+    if (!job.isPresent()) {
       return false;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/89765929/src/main/java/org/apache/aurora/scheduler/state/ImmediateJobManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/ImmediateJobManager.java b/src/main/java/org/apache/aurora/scheduler/state/ImmediateJobManager.java
deleted file mode 100644
index dc18b5b..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/ImmediateJobManager.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Job scheduler that accepts any job and executes it immediately.
- */
-class ImmediateJobManager extends JobManager {
-
-  private static final Logger LOG = Logger.getLogger(ImmediateJobManager.class.getName());
-
-  private final StateManager stateManager;
-  private final Storage storage;
-
-  @Inject
-  ImmediateJobManager(StateManager stateManager, Storage storage) {
-    this.stateManager = checkNotNull(stateManager);
-    this.storage = checkNotNull(storage);
-  }
-
-  @Override
-  public String getUniqueKey() {
-    return "IMMEDIATE";
-  }
-
-  @Override
-  public boolean receiveJob(SanitizedConfiguration config) {
-    LOG.info("Launching " + config.getTaskConfigs().size() + " tasks.");
-    stateManager.insertPendingTasks(config.getTaskConfigs());
-    return true;
-  }
-
-  @Override
-  public boolean hasJob(final IJobKey jobKey) {
-    return !Storage.Util.consistentFetchTasks(storage, Query.jobScoped(jobKey).active()).isEmpty();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/89765929/src/main/java/org/apache/aurora/scheduler/state/JobManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/JobManager.java b/src/main/java/org/apache/aurora/scheduler/state/JobManager.java
deleted file mode 100644
index 8dba0a7..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/JobManager.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import java.util.Collections;
-
-import javax.inject.Inject;
-
-import org.apache.aurora.scheduler.base.ScheduleException;
-import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-
-/**
- * Interface for a job manager.  A job manager is responsible for deciding whether and when
to
- * trigger execution of a job.
- */
-public abstract class JobManager {
-
-  // TODO(Bill Farner): Remove this. It is only used since the CronJobManager and SchedulerCoreImpl
-  // have a circular dependency.
-  @Inject
-  protected SchedulerCore schedulerCore;
-
-  /**
-   * Gets a key that uniquely identifies this manager type, to distinguish from other schedulers.
-   * These keys end up being persisted, so they must be considered permanently immutable.
-   *
-   * @return Job manager key.
-   */
-  public abstract String getUniqueKey();
-
-  /**
-   * Submits a job to the manager.  The job may be submitted to the job runner before this
method
-   * returns or at any point in the future.  This method will return false if the manager
will not
-   * execute the job.
-   *
-   * @param config The job to schedule.
-   * @return {@code true} If the manager accepted the job, {@code false} otherwise.
-   * @throws ScheduleException If there is a problem with scheduling the job.
-   */
-  public abstract boolean receiveJob(SanitizedConfiguration config) throws ScheduleException;
-
-  /**
-   * Fetches the configured jobs that this manager is storing.
-   *
-   * @return Jobs stored by this job manager.
-   */
-  // TODO(ksweeney): Consider adding a Map<JobKey, JobConfiguration> to complement
this.
-  public Iterable<IJobConfiguration> getJobs() {
-    return Collections.emptyList();
-  }
-
-  /**
-   * Checks whether this manager is storing a job with the given key.
-   *
-   * @param jobKey Job key.
-   * @return {@code true} if the manager has a matching job, {@code false} otherwise.
-   */
-  public abstract boolean hasJob(IJobKey jobKey);
-
-  /**
-   * Instructs the manager to delete any jobs with the given key.
-   *
-   * @param jobKey Job key.
-   * @return {@code true} if a matching job was deleted.
-   */
-  public boolean deleteJob(IJobKey jobKey) {
-    // Optionally overridden by implementing class.
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/89765929/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
index 7a13a8b..8e3bfd3 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
@@ -63,15 +63,6 @@ public interface SchedulerCore {
       throws ScheduleException;
 
   /**
-   * Starts a cron job immediately.
-   *
-   * @param jobKey Job key.
-   * @throws ScheduleException If the specified job does not exist, or is not a cron job.
-   * @throws TaskDescriptionException If the parsing of the job failed.
-   */
-  void startCronJob(IJobKey jobKey) throws ScheduleException, TaskDescriptionException;
-
-  /**
    * Assigns a new state to tasks.
    *
    * @param taskId ID of the task to transition.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/89765929/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
index 3ca4ee5..9ebfd94 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
@@ -24,9 +24,7 @@ import javax.inject.Inject;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Functions;
 import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
@@ -41,7 +39,6 @@ import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.ScheduleException;
 import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
 import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
 import org.apache.aurora.scheduler.quota.QuotaCheckResult;
 import org.apache.aurora.scheduler.quota.QuotaManager;
@@ -72,11 +69,10 @@ class SchedulerCoreImpl implements SchedulerCore {
 
   private final Storage storage;
 
+  // TODO(wfarner): Consider changing this class to not be concerned with cron jobs, requiring
the
+  // caller to deal with the fork.
   private final CronJobManager cronScheduler;
 
-  // Schedulers that are responsible for triggering execution of jobs.
-  private final ImmutableList<JobManager> jobManagers;
-
   // State manager handles persistence of task modifications and state transitions.
   private final StateManager stateManager;
 
@@ -88,7 +84,6 @@ class SchedulerCoreImpl implements SchedulerCore {
    *
    * @param storage Backing store implementation.
    * @param cronScheduler Cron scheduler.
-   * @param immediateScheduler Immediate scheduler.
    * @param stateManager Persistent state manager.
    * @param taskIdGenerator Task ID generator.
    * @param quotaManager Quota manager.
@@ -97,16 +92,11 @@ class SchedulerCoreImpl implements SchedulerCore {
   public SchedulerCoreImpl(
       Storage storage,
       CronJobManager cronScheduler,
-      ImmediateJobManager immediateScheduler,
       StateManager stateManager,
       TaskIdGenerator taskIdGenerator,
       QuotaManager quotaManager) {
 
     this.storage = checkNotNull(storage);
-
-    // The immediate scheduler will accept any job, so it's important that other schedulers
are
-    // placed first.
-    this.jobManagers = ImmutableList.of(cronScheduler, immediateScheduler);
     this.cronScheduler = cronScheduler;
     this.stateManager = checkNotNull(stateManager);
     this.taskIdGenerator = checkNotNull(taskIdGenerator);
@@ -114,7 +104,11 @@ class SchedulerCoreImpl implements SchedulerCore {
   }
 
   private boolean hasActiveJob(IJobConfiguration job) {
-    return Iterables.any(jobManagers, managerHasJob(job));
+    boolean hasActiveTasks = !Storage.Util.consistentFetchTasks(
+        storage,
+        Query.jobScoped(job.getKey()).active()).isEmpty();
+
+    return hasActiveTasks || cronScheduler.hasJob(job.getKey());
   }
 
   @Override
@@ -138,21 +132,9 @@ class SchedulerCoreImpl implements SchedulerCore {
 
         validateTaskLimits(job.getTaskConfig(), job.getInstanceCount());
 
-        boolean accepted = false;
-        // TODO(wfarner): Remove the JobManager abstraction, and directly invoke addInstances
-        // here for non-cron jobs.
-        for (final JobManager manager : jobManagers) {
-          if (manager.receiveJob(sanitizedConfiguration)) {
-            LOG.info("Job accepted by manager: " + manager.getUniqueKey());
-            accepted = true;
-            break;
-          }
-        }
-
-        if (!accepted) {
-          LOG.severe("Job was not accepted by any of the configured schedulers, discarding.");
-          LOG.severe("Discarded job: " + job);
-          throw new ScheduleException("Job not accepted, discarding.");
+        if (!cronScheduler.receiveJob(sanitizedConfiguration)) {
+          LOG.info("Launching " + sanitizedConfiguration.getTaskConfigs().size() + " tasks.");
+          stateManager.insertPendingTasks(sanitizedConfiguration.getTaskConfigs());
         }
       }
     });
@@ -219,34 +201,6 @@ class SchedulerCoreImpl implements SchedulerCore {
   }
 
   @Override
-  public synchronized void startCronJob(IJobKey jobKey)
-      throws ScheduleException, TaskDescriptionException {
-
-    checkNotNull(jobKey);
-
-    if (!cronScheduler.hasJob(jobKey)) {
-      throw new ScheduleException("Cron job does not exist for " + JobKeys.toPath(jobKey));
-    }
-
-    cronScheduler.startJobNow(jobKey);
-  }
-
-  /**
-   * Creates a predicate that will determine whether a job manager has a job matching a job
key.
-   *
-   * @param job Job to match.
-   * @return A new predicate matching the job owner and name given.
-   */
-  private static Predicate<JobManager> managerHasJob(final IJobConfiguration job) {
-    return new Predicate<JobManager>() {
-      @Override
-      public boolean apply(JobManager manager) {
-        return manager.hasJob(job.getKey());
-      }
-    };
-  }
-
-  @Override
   public synchronized void setTaskStatus(
       String taskId,
       final ScheduleStatus status,
@@ -265,18 +219,14 @@ class SchedulerCoreImpl implements SchedulerCore {
     checkNotNull(query);
     LOG.info("Killing tasks matching " + query);
 
-    boolean jobDeleted = false;
+    boolean cronDeleted = false;
 
     if (Query.isSingleJobScoped(query)) {
-      // If this looks like a query for all tasks in a job, instruct the scheduler modules
to
-      // delete the job.
+      // If this looks like a query for all tasks in a job, instruct the cron scheduler to
delete
+      // it.
       // TODO(maxim): Should be trivial to support killing multiple jobs instead.
       IJobKey jobKey = Iterables.getOnlyElement(JobKeys.from(query).get());
-      for (JobManager manager : jobManagers) {
-        if (manager.deleteJob(jobKey)) {
-          jobDeleted = true;
-        }
-      }
+      cronDeleted = cronScheduler.deleteJob(jobKey);
     }
 
     // Unless statuses were specifically supplied, only attempt to kill active tasks.
@@ -303,7 +253,7 @@ class SchedulerCoreImpl implements SchedulerCore {
       }
     });
 
-    if (!jobDeleted && (tasksAffected == 0)) {
+    if (!cronDeleted && (tasksAffected == 0)) {
       throw new ScheduleException("No jobs to kill");
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/89765929/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
index 4f0bbfa..7d26082 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
@@ -50,8 +50,6 @@ public class StateModule extends AbstractModule {
     bind(LockManagerImpl.class).in(Singleton.class);
 
     bindCronJobManager(binder());
-    bind(ImmediateJobManager.class).in(Singleton.class);
-
     bindMaintenanceController(binder());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/89765929/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index 12d33ff..84151a5 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -299,15 +299,8 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
           ILockKey.build(LockKey.job(jobKey.newBuilder())),
           Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
 
-      SanitizedConfiguration sanitized = SanitizedConfiguration.fromUnsanitized(job);
-
-      if (!cronJobManager.hasJob(jobKey)) {
-        return response.setResponseCode(INVALID_REQUEST).setMessage(
-            "No cron template found for the given key: " + jobKey);
-      }
-      cronJobManager.updateJob(sanitized);
+      cronJobManager.updateJob(SanitizedConfiguration.fromUnsanitized(job));
       return response.setResponseCode(OK).setMessage("Replaced template for: " + jobKey);
-
     } catch (LockException e) {
       return response.setResponseCode(LOCK_ERROR).setMessage(e.getMessage());
     } catch (TaskDescriptionException | ScheduleException e) {
@@ -350,7 +343,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     }
 
     try {
-      schedulerCore.startCronJob(jobKey);
+      cronJobManager.startJobNow(jobKey);
       response.setResponseCode(OK).setMessage("Cron run started.");
     } catch (ScheduleException e) {
       response.setResponseCode(INVALID_REQUEST)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/89765929/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
index 033ebd2..f834bfb 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
@@ -38,6 +38,7 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.twitter.common.application.ShutdownRegistry;
 import com.twitter.common.collections.Pair;
 import com.twitter.common.testing.easymock.EasyMockTest;
@@ -207,17 +208,18 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest
{
         taskIdGenerator,
         eventSink,
         rescheduleCalculator);
-    ImmediateJobManager immediateManager = new ImmediateJobManager(stateManager, storage);
-    cron = new CronJobManager(stateManager, storage, cronScheduler, shutdownRegistry);
+    cron = new CronJobManager(
+        stateManager,
+        storage,
+        cronScheduler,
+        shutdownRegistry,
+        MoreExecutors.sameThreadExecutor());
     scheduler = new SchedulerCoreImpl(
         storage,
         cron,
-        immediateManager,
         stateManager,
         taskIdGenerator,
         quotaManager);
-    cron.schedulerCore = scheduler;
-    immediateManager.schedulerCore = scheduler;
   }
 
   @Test
@@ -472,7 +474,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     scheduler.createJob(sanitizedConfiguration);
     assertTaskCount(0);
 
-    scheduler.startCronJob(jobKey);
+    cron.startJobNow(jobKey);
     assertEquals(PENDING, getOnlyTask(Query.jobScoped(jobKey)).getStatus());
   }
 
@@ -482,7 +484,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     control.replay();
     buildScheduler();
 
-    scheduler.startCronJob(KEY_A);
+    cron.startJobNow(KEY_A);
   }
 
   @Test
@@ -496,7 +498,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     String taskId = Tasks.id(getOnlyTask(Query.jobScoped(KEY_A)));
 
     try {
-      scheduler.startCronJob(KEY_A);
+      cron.startJobNow(KEY_A);
       fail("Start should have failed.");
     } catch (ScheduleException e) {
       // Expected.
@@ -523,7 +525,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     scheduler.createJob(sanitizedConfiguration);
     assertTaskCount(0);
 
-    scheduler.startCronJob(KEY_B);
+    cron.startJobNow(KEY_B);
   }
 
   @Test
@@ -543,13 +545,13 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest
{
     assertTaskCount(0);
     assertTrue(cron.hasJob(KEY_A));
 
-    scheduler.startCronJob(KEY_A);
+    cron.startJobNow(KEY_A);
     assertTaskCount(1);
 
     String taskId = Tasks.id(getOnlyTask(Query.jobScoped(KEY_A)));
 
     // Now start the same cron job immediately.
-    scheduler.startCronJob(KEY_A);
+    cron.startJobNow(KEY_A);
     assertTaskCount(1);
     assertEquals(PENDING, getOnlyTask(Query.jobScoped(KEY_A)).getStatus());
 
@@ -790,7 +792,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     assertTrue(cron.hasJob(KEY_A));
 
     // Simulate a triggering of the cron job.
-    scheduler.startCronJob(KEY_A);
+    cron.startJobNow(KEY_A);
     assertTaskCount(10);
     assertEquals(10,
         getTasks(Query.jobScoped(KEY_A).byStatus(PENDING)).size());
@@ -827,13 +829,13 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest
{
     assertTrue(cron.hasJob(KEY_A));
 
     // Simulate a triggering of the cron job.
-    scheduler.startCronJob(KEY_A);
+    cron.startJobNow(KEY_A);
     assertTaskCount(10);
 
     Set<String> taskIds = Tasks.ids(getTasksOwnedBy(OWNER_A));
 
     // Simulate a triggering of the cron job.
-    scheduler.startCronJob(KEY_A);
+    cron.startJobNow(KEY_A);
     assertTaskCount(10);
     assertTrue(Sets.intersection(taskIds, Tasks.ids(getTasksOwnedBy(OWNER_A))).isEmpty());
 
@@ -943,7 +945,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
 
     scheduler.createJob(config);
     assertTrue(cron.hasJob(KEY_A));
-    scheduler.startCronJob(KEY_A);
+    cron.startJobNow(KEY_A);
     assertTaskCount(10);
 
     scheduler.killTasks(Query.instanceScoped(KEY_A, 0), USER_A);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/89765929/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java b/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java
index 380df10..fa9cb75 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java
@@ -28,10 +28,12 @@ import com.twitter.common.application.ShutdownRegistry;
 import com.twitter.common.base.ExceptionalCommand;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
+import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.CronCollisionPolicy;
 import org.apache.aurora.gen.ExecutorConfig;
 import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.JobConfiguration;
+import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.base.JobKeys;
@@ -54,9 +56,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.aurora.gen.apiConstants.DEFAULT_ENVIRONMENT;
-import static org.apache.aurora.scheduler.state.CronJobManager.CRON_USER;
 import static org.apache.aurora.scheduler.state.CronJobManager.MANAGER_KEY;
-import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
@@ -71,9 +71,10 @@ public class CronJobManagerTest extends EasyMockTest {
   private static final String ENVIRONMENT = "staging11";
   private static final String JOB_NAME = "jobName";
   private static final String DEFAULT_JOB_KEY = "key";
-  private static final IScheduledTask TASK = IScheduledTask.build(new ScheduledTask());
+  private static final String TASK_ID = "id";
+  private static final IScheduledTask TASK = IScheduledTask.build(
+      new ScheduledTask().setAssignedTask(new AssignedTask().setTaskId(TASK_ID)));
 
-  private SchedulerCore scheduler;
   private StateManager stateManager;
   private Executor delayExecutor;
   private Capture<Runnable> delayLaunchCapture;
@@ -87,7 +88,6 @@ public class CronJobManagerTest extends EasyMockTest {
 
   @Before
   public void setUp() throws Exception {
-    scheduler = createMock(SchedulerCore.class);
     stateManager = createMock(StateManager.class);
     delayExecutor = createMock(Executor.class);
     delayLaunchCapture = createCapture();
@@ -102,7 +102,6 @@ public class CronJobManagerTest extends EasyMockTest {
         cronScheduler,
         shutdownRegistry,
         delayExecutor);
-    cron.schedulerCore = scheduler;
     job = makeJob();
     sanitizedConfiguration = SanitizedConfiguration.fromUnsanitized(job);
   }
@@ -155,7 +154,6 @@ public class CronJobManagerTest extends EasyMockTest {
         bind(Storage.class).toInstance(storageUtil.storage);
         bind(CronScheduler.class).toInstance(cronScheduler);
         bind(ShutdownRegistry.class).toInstance(shutdownRegistry);
-        bind(SchedulerCore.class).toInstance(scheduler);
         PubsubTestUtil.installPubsub(binder());
         StateModule.bindCronJobManager(binder());
       }
@@ -198,6 +196,16 @@ public class CronJobManagerTest extends EasyMockTest {
     assertTrue(cron.deleteJob(job.getKey()));
   }
 
+  private IExpectationSetters<?> expectTaskKilled(String id) {
+    expect(stateManager.changeState(
+        id,
+        Optional.<ScheduleStatus>absent(),
+        ScheduleStatus.KILLING,
+        CronJobManager.KILL_AUDIT_MESSAGE))
+        .andReturn(true);
+    return expectLastCall();
+  }
+
   @Test
   public void testDelayedStart() throws Exception {
     expectJobAccepted();
@@ -210,7 +218,7 @@ public class CronJobManagerTest extends EasyMockTest {
     delayExecutor.execute(capture(delayLaunchCapture));
 
     // The cron manager will then try to initiate the kill.
-    scheduler.killTasks((Query.Builder) anyObject(), eq(CronJobManager.CRON_USER));
+    expectTaskKilled(TASK_ID);
 
     // Immediate query and delayed query.
     expectActiveTaskFetch(TASK).times(2);
@@ -241,7 +249,7 @@ public class CronJobManagerTest extends EasyMockTest {
     delayExecutor.execute(capture(delayLaunchCapture));
 
     // The cron manager will then try to initiate the kill.
-    scheduler.killTasks((Query.Builder) anyObject(), eq(CronJobManager.CRON_USER));
+    expectTaskKilled(TASK_ID);
 
     // Immediate query and delayed query.
     expectActiveTaskFetch(TASK).times(2);
@@ -253,7 +261,7 @@ public class CronJobManagerTest extends EasyMockTest {
     expectJobFetch();
     expectActiveTaskFetch(TASK);
     delayExecutor.execute(capture(delayLaunchCapture));
-    scheduler.killTasks((Query.Builder) anyObject(), eq(CronJobManager.CRON_USER));
+    expectTaskKilled(TASK_ID);
     expectActiveTaskFetch(TASK).times(2);
     expectActiveTaskFetch();
 
@@ -286,8 +294,7 @@ public class CronJobManagerTest extends EasyMockTest {
     // The cron manager will then try to initiate the kill.
     expectJobFetch();
     expectJobFetch();
-    scheduler.killTasks((Query.Builder) anyObject(), eq(CronJobManager.CRON_USER));
-    expectLastCall().times(3);
+    expectTaskKilled(TASK_ID).times(3);
 
     // Immediate queries and delayed query.
     expectActiveTaskFetch(TASK).times(4);
@@ -404,21 +411,6 @@ public class CronJobManagerTest extends EasyMockTest {
   }
 
   @Test
-  public void testKillExistingCollisionFailedKill() throws Exception {
-    IJobConfiguration killExisting = IJobConfiguration.build(
-        job.newBuilder().setCronCollisionPolicy(CronCollisionPolicy.KILL_EXISTING));
-    Capture<Runnable> jobTriggerCapture = expectJobAccepted(killExisting);
-    expectActiveTaskFetch(TASK);
-    scheduler.killTasks(Query.jobScoped(killExisting.getKey()).active(), CRON_USER);
-    expectLastCall().andThrow(new ScheduleException("injected"));
-
-    control.replay();
-
-    cron.receiveJob(new SanitizedConfiguration(killExisting));
-    jobTriggerCapture.getValue().run();
-  }
-
-  @Test
   public void testCancelNewCollision() throws Exception {
     IJobConfiguration killExisting = IJobConfiguration.build(
         job.newBuilder().setCronCollisionPolicy(CronCollisionPolicy.CANCEL_NEW));

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/89765929/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index c89965c..fae2de1 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -663,7 +663,6 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   public void testReplaceCronTemplate() throws Exception {
     expectAuth(ROLE, true);
     lockManager.validateIfLocked(LOCK_KEY, Optional.<ILock>absent());
-    expect(cronJobManager.hasJob(JOB_KEY)).andReturn(true);
     cronJobManager.updateJob(anyObject(SanitizedConfiguration.class));
     control.replay();
 
@@ -699,7 +698,9 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   public void testReplaceCronTemplateDoesNotExist() throws Exception {
     expectAuth(ROLE, true);
     lockManager.validateIfLocked(LOCK_KEY, Optional.<ILock>absent());
-    expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false);
+    cronJobManager.updateJob(
+        SanitizedConfiguration.fromUnsanitized(IJobConfiguration.build(CRON_JOB)));
+    expectLastCall().andThrow(new ScheduleException("Nope"));
     control.replay();
 
     assertEquals(


Mime
View raw message