aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject [2/2] git commit: Refactoring SchedulerCore final part.
Date Fri, 05 Sep 2014 22:14:44 GMT
Refactoring SchedulerCore final part.

Bugs closed: AURORA-94

Reviewed at https://reviews.apache.org/r/24815/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/4920a8b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/4920a8b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/4920a8b8

Branch: refs/heads/master
Commit: 4920a8b8621a350f83e50a2a943b6b62f7f925cc
Parents: 277103b
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Fri Sep 5 15:14:28 2014 -0700
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Fri Sep 5 15:14:28 2014 -0700

----------------------------------------------------------------------
 .../scheduler/base/ScheduleException.java       |  31 -
 .../configuration/SanitizedConfiguration.java   |  24 +-
 .../scheduler/cron/quartz/AuroraCronJob.java    |  24 +-
 .../aurora/scheduler/state/SchedulerCore.java   |  57 --
 .../scheduler/state/SchedulerCoreImpl.java      | 203 -----
 .../aurora/scheduler/state/StateManager.java    |  10 +-
 .../scheduler/state/StateManagerImpl.java       |  26 +-
 .../aurora/scheduler/state/StateModule.java     |   5 +-
 .../scheduler/state/TaskLimitValidator.java     | 113 +++
 .../thrift/SchedulerThriftInterface.java        | 137 ++--
 .../thrift/org/apache/aurora/gen/api.thrift     |   1 +
 .../cron/quartz/AuroraCronJobTest.java          |  10 +-
 .../scheduler/http/JettyServerModuleTest.java   |   2 -
 .../state/BaseSchedulerCoreImplTest.java        | 783 -------------------
 .../scheduler/state/StateManagerImplTest.java   |  26 +-
 .../scheduler/state/TaskLimitValidatorTest.java | 117 +++
 .../scheduler/storage/StorageBackfillTest.java  |  11 +-
 .../mem/MemStorageSchedulerCoreImplTest.java    |  25 -
 .../thrift/SchedulerThriftInterfaceTest.java    | 493 +++++++-----
 .../aurora/scheduler/thrift/ThriftIT.java       |   4 +-
 .../org/apache/aurora/gen/api.thrift.md5        |   2 +-
 21 files changed, 706 insertions(+), 1398 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4920a8b8/src/main/java/org/apache/aurora/scheduler/base/ScheduleException.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/ScheduleException.java b/src/main/java/org/apache/aurora/scheduler/base/ScheduleException.java
deleted file mode 100644
index e060e5e..0000000
--- a/src/main/java/org/apache/aurora/scheduler/base/ScheduleException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.base;
-
-/**
- * Exception class to signal a failure to schedule a task or job.
- */
-public class ScheduleException extends Exception {
-  public ScheduleException(String msg) {
-    super(msg);
-  }
-
-  public ScheduleException(String msg, Throwable t) {
-    super(msg, t);
-  }
-
-  public ScheduleException(Throwable t) {
-    super(t);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4920a8b8/src/main/java/org/apache/aurora/scheduler/configuration/SanitizedConfiguration.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/SanitizedConfiguration.java b/src/main/java/org/apache/aurora/scheduler/configuration/SanitizedConfiguration.java
index d511ec0..4eb4437 100644
--- a/src/main/java/org/apache/aurora/scheduler/configuration/SanitizedConfiguration.java
+++ b/src/main/java/org/apache/aurora/scheduler/configuration/SanitizedConfiguration.java
@@ -13,20 +13,17 @@
  */
 package org.apache.aurora.scheduler.configuration;
 
-import java.util.Map;
+import java.util.Set;
 import java.util.logging.Logger;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Functions;
 import com.google.common.base.Objects;
 import com.google.common.collect.ContiguousSet;
 import com.google.common.collect.DiscreteDomain;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Range;
 
 import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.commons.lang.StringUtils;
 
 /**
@@ -37,22 +34,20 @@ public final class SanitizedConfiguration {
   private static final Logger LOG = Logger.getLogger(SanitizedConfiguration.class.getName());
 
   private final IJobConfiguration sanitized;
-  private final Map<Integer, ITaskConfig> tasks;
+  private final Set<Integer> instanceIds;
 
   /**
-   * Constructs a SanitizedConfiguration object and populates the set of {@link ITaskConfig}s for
-   * the provided config.
+   * Constructs a SanitizedConfiguration object and populates the set of instance IDs for
+   * the provided {@link org.apache.aurora.scheduler.storage.entities.ITaskConfig}.
    *
    * @param sanitized A sanitized configuration.
    */
   @VisibleForTesting
   public SanitizedConfiguration(IJobConfiguration sanitized) {
     this.sanitized = sanitized;
-    this.tasks = Maps.toMap(
-        ContiguousSet.create(
-            Range.closedOpen(0, sanitized.getInstanceCount()),
-            DiscreteDomain.integers()),
-        Functions.constant(sanitized.getTaskConfig()));
+    this.instanceIds = ContiguousSet.create(
+        Range.closedOpen(0, sanitized.getInstanceCount()),
+        DiscreteDomain.integers());
   }
 
   /**
@@ -72,9 +67,8 @@ public final class SanitizedConfiguration {
     return sanitized;
   }
 
-  // TODO(William Farner): Rework this API now that all configs are identical.
-  public Map<Integer, ITaskConfig> getTaskConfigs() {
-    return tasks;
+  public Set<Integer> getInstanceIds() {
+    return instanceIds;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4920a8b8/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
index 6220281..9388657 100644
--- a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
@@ -14,7 +14,6 @@
 package org.apache.aurora.scheduler.cron.quartz;
 
 import java.util.Date;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
@@ -24,7 +23,6 @@ import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
 import com.twitter.common.base.Supplier;
 import com.twitter.common.stats.Stats;
 import com.twitter.common.util.BackoffHelper;
@@ -94,11 +92,13 @@ class AuroraCronJob implements Job {
   }
 
   private static final class DeferredLaunch {
-    private final Map<Integer, ITaskConfig> pendingTasks;
+    private final ITaskConfig task;
+    private final Set<Integer> instanceIds;
     private final Set<String> activeTaskIds;
 
-    DeferredLaunch(Map<Integer, ITaskConfig> pendingTasks, Set<String> activeTaskIds) {
-      this.pendingTasks = pendingTasks;
+    DeferredLaunch(ITaskConfig task, Set<Integer> instanceIds, Set<String> activeTaskIds) {
+      this.task = task;
+      this.instanceIds = instanceIds;
       this.activeTaskIds = activeTaskIds;
     }
   }
@@ -145,22 +145,22 @@ class AuroraCronJob implements Job {
                 "Cron triggered for %s at %s with policy %s", path, new Date(), collisionPolicy));
             CRON_JOB_TRIGGERS.incrementAndGet();
 
-            ImmutableMap<Integer, ITaskConfig> pendingTasks =
-                ImmutableMap.copyOf(cronJob.getSanitizedConfig().getTaskConfigs());
-
             final Query.Builder activeQuery = Query.jobScoped(key).active();
             Set<String> activeTasks =
                 Tasks.ids(storeProvider.getTaskStore().fetchTasks(activeQuery));
 
+            ITaskConfig task = cronJob.getSanitizedConfig().getJobConfig().getTaskConfig();
+            Set<Integer> instanceIds = cronJob.getSanitizedConfig().getInstanceIds();
             if (activeTasks.isEmpty()) {
-              stateManager.insertPendingTasks(pendingTasks);
+              stateManager.insertPendingTasks(task, instanceIds);
+
               return Optional.absent();
             }
 
             CRON_JOB_COLLISIONS.incrementAndGet();
             switch (collisionPolicy) {
               case KILL_EXISTING:
-                return Optional.of(new DeferredLaunch(pendingTasks, activeTasks));
+                return Optional.of(new DeferredLaunch(task, instanceIds, activeTasks));
 
               case RUN_OVERLAP:
                 LOG.severe(String.format("Ignoring trigger for job %s with deprecated collision"
@@ -200,7 +200,9 @@ class AuroraCronJob implements Job {
         public Boolean get() {
           if (Storage.Util.consistentFetchTasks(storage, query).isEmpty()) {
             LOG.info("Initiating delayed launch of cron " + path);
-            stateManager.insertPendingTasks(deferredLaunch.get().pendingTasks);
+            stateManager.insertPendingTasks(
+                deferredLaunch.get().task,
+                deferredLaunch.get().instanceIds);
             return true;
           } else {
             LOG.info("Not yet safe to run cron " + path);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4920a8b8/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
deleted file mode 100644
index c636fd7..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.scheduler.base.ScheduleException;
-import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
-import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-
-/**
- * Scheduling core, stores scheduler state and makes decisions about which tasks to schedule when
- * a resource offer is made.
- *
- * When a job is submitted to the scheduler core, it will store the job configuration and offer
- * the job to all configured scheduler modules, which are responsible for triggering execution of
- * the job.  Until a job is triggered by a scheduler module, it is retained in the scheduler core
- * in the PENDING state.
- */
-public interface SchedulerCore {
-
-  /**
-   * Creates a new job, whose tasks will become candidates for scheduling.
-   *
-   * @param sanitizedConfiguration The configuration of the job to create tasks for.
-   * @throws ScheduleException If there was an error scheduling a cron job.
-   * @throws TaskDescriptionException If an invalid task description was given.
-   */
-  void createJob(SanitizedConfiguration sanitizedConfiguration)
-      throws ScheduleException, TaskDescriptionException;
-
-  /**
-   * Adds new instances specified by the instances set.
-   * <p>
-   * Provided instance IDs should be disjoint from the instance IDs active in the job.
-   *
-   * @param jobKey IJobKey identifying the parent job.
-   * @param instanceIds Set of instance IDs to be added to the job.
-   * @param config ITaskConfig to use with new instances.
-   * @throws ScheduleException If any of the existing instance IDs already exist.
-   */
-  void addInstances(IJobKey jobKey, ImmutableSet<Integer> instanceIds, ITaskConfig config)
-      throws ScheduleException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4920a8b8/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
deleted file mode 100644
index 1e60fca..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import java.util.Set;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Functions;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.args.constraints.Positive;
-
-import org.apache.aurora.scheduler.TaskIdGenerator;
-import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.ScheduleException;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
-import org.apache.aurora.scheduler.cron.CronException;
-import org.apache.aurora.scheduler.cron.CronJobManager;
-import org.apache.aurora.scheduler.cron.SanitizedCronJob;
-import org.apache.aurora.scheduler.quota.QuotaCheckResult;
-import org.apache.aurora.scheduler.quota.QuotaManager;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-
-import static java.util.Objects.requireNonNull;
-
-import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.INSUFFICIENT_QUOTA;
-
-/**
- * Implementation of the scheduler core.
- */
-class SchedulerCoreImpl implements SchedulerCore {
-  @Positive
-  @CmdLine(name = "max_tasks_per_job", help = "Maximum number of allowed tasks in a single job.")
-  public static final Arg<Integer> MAX_TASKS_PER_JOB = Arg.create(4000);
-
-  private static final Logger LOG = Logger.getLogger(SchedulerCoreImpl.class.getName());
-
-  private final Storage storage;
-
-  // TODO(wfarner): Consider changing this class to not be concerned with cron jobs, requiring the
-  // caller to deal with the fork.
-  private final CronJobManager cronJobManager;
-
-  // State manager handles persistence of task modifications and state transitions.
-  private final StateManager stateManager;
-
-  private final TaskIdGenerator taskIdGenerator;
-  private final QuotaManager quotaManager;
-
-  /**
-   * Creates a new core scheduler.
-   *
-   * @param storage Backing store implementation.
-   * @param cronJobManager Cron scheduler.
-   * @param stateManager Persistent state manager.
-   * @param taskIdGenerator Task ID generator.
-   * @param quotaManager Quota manager.
-   */
-  @Inject
-  public SchedulerCoreImpl(
-      Storage storage,
-      CronJobManager cronJobManager,
-      StateManager stateManager,
-      TaskIdGenerator taskIdGenerator,
-      QuotaManager quotaManager) {
-
-    this.storage = requireNonNull(storage);
-    this.cronJobManager = cronJobManager;
-    this.stateManager = requireNonNull(stateManager);
-    this.taskIdGenerator = requireNonNull(taskIdGenerator);
-    this.quotaManager = requireNonNull(quotaManager);
-  }
-
-  private boolean hasActiveJob(IJobConfiguration job) {
-    boolean hasActiveTasks = !Storage.Util.consistentFetchTasks(
-        storage,
-        Query.jobScoped(job.getKey()).active()).isEmpty();
-
-    return hasActiveTasks || cronJobManager.hasJob(job.getKey());
-  }
-
-  @Override
-  public synchronized void createJob(final SanitizedConfiguration sanitizedConfiguration)
-      throws ScheduleException {
-
-    storage.write(new MutateWork.NoResult<ScheduleException>() {
-      @Override
-      protected void execute(MutableStoreProvider storeProvider) throws ScheduleException {
-        final IJobConfiguration job = sanitizedConfiguration.getJobConfig();
-        if (hasActiveJob(job)) {
-          throw new ScheduleException(
-              "Job already exists: " + JobKeys.canonicalString(job.getKey()));
-        }
-
-        validateTaskLimits(job.getTaskConfig(), job.getInstanceCount());
-        // TODO(mchucarroll): deprecate cron as a part of create/kill job.(AURORA-454)
-        if (sanitizedConfiguration.isCron()) {
-          try {
-            LOG.warning("Deprecated behavior: scheduling job " + job.getKey()
-                + " with cron via createJob (AURORA_454)");
-            cronJobManager.createJob(SanitizedCronJob.from(sanitizedConfiguration));
-          } catch (CronException e) {
-            throw new ScheduleException(e);
-          }
-        } else {
-          LOG.info("Launching " + sanitizedConfiguration.getTaskConfigs().size() + " tasks.");
-          stateManager.insertPendingTasks(sanitizedConfiguration.getTaskConfigs());
-        }
-      }
-    });
-  }
-
-  // This number is derived from the maximum file name length limit on most UNIX systems, less
-  // the number of characters we've observed being added by mesos for the executor ID, prefix, and
-  // delimiters.
-  @VisibleForTesting
-  static final int MAX_TASK_ID_LENGTH = 255 - 90;
-
-  /**
-   * Validates task specific requirements including name, count and quota checks.
-   * Must be performed inside of a write storage transaction along with state mutation change
-   * to avoid any data race conditions.
-   *
-   * @param task Task configuration.
-   * @param instances Number of task instances
-   * @throws ScheduleException If validation fails.
-   */
-  private void validateTaskLimits(ITaskConfig task, int instances)
-      throws ScheduleException {
-
-    // TODO(maximk): This is a short-term hack to stop the bleeding from
-    //               https://issues.apache.org/jira/browse/MESOS-691
-    if (taskIdGenerator.generate(task, instances).length() > MAX_TASK_ID_LENGTH) {
-      throw new ScheduleException(
-          "Task ID is too long, please shorten your role or job name.");
-    }
-
-    if (instances > MAX_TASKS_PER_JOB.get()) {
-      throw new ScheduleException("Job exceeds task limit of " + MAX_TASKS_PER_JOB.get());
-    }
-
-    QuotaCheckResult quotaCheck = quotaManager.checkQuota(
-        ImmutableMap.<ITaskConfig, Integer>of(),
-        task,
-        instances);
-
-    if (quotaCheck.getResult() == INSUFFICIENT_QUOTA) {
-      throw new ScheduleException("Insufficient resource quota: " + quotaCheck.getDetails().or(""));
-    }
-  }
-
-  @Override
-  public void addInstances(
-      final IJobKey jobKey,
-      final ImmutableSet<Integer> instanceIds,
-      final ITaskConfig config) throws ScheduleException {
-
-    storage.write(new MutateWork.NoResult<ScheduleException>() {
-      @Override
-      protected void execute(MutableStoreProvider storeProvider) throws ScheduleException {
-        validateTaskLimits(config, instanceIds.size());
-
-        ImmutableSet<IScheduledTask> tasks =
-            storeProvider.getTaskStore().fetchTasks(Query.jobScoped(jobKey).active());
-
-        Set<Integer> existingInstanceIds =
-            FluentIterable.from(tasks).transform(Tasks.SCHEDULED_TO_INSTANCE_ID).toSet();
-        if (!Sets.intersection(existingInstanceIds, instanceIds).isEmpty()) {
-          throw new ScheduleException("Instance ID collision detected.");
-        }
-
-        stateManager.insertPendingTasks(Maps.asMap(instanceIds, Functions.constant(config)));
-      }
-    });
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4920a8b8/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManager.java b/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
index 6e062b3..3a2fd27 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
@@ -13,7 +13,6 @@
  */
 package org.apache.aurora.scheduler.state;
 
-import java.util.Map;
 import java.util.Set;
 
 import com.google.common.base.Optional;
@@ -70,12 +69,13 @@ public interface StateManager {
       Set<Integer> assignedPorts);
 
   /**
-   * Inserts new tasks into the store. Tasks will immediately move into PENDING and will be eligible
-   * for scheduling.
+   * Inserts pending instances using {@code task} as their configuration. Tasks will immediately
+   * move into PENDING and will be eligible for scheduling.
    *
-   * @param tasks Tasks to insert, mapped by their instance IDs.
+   * @param task Task template.
+   * @param instanceIds Instance IDs to assign to new PENDING tasks.
    */
-  void insertPendingTasks(Map<Integer, ITaskConfig> tasks);
+  void insertPendingTasks(ITaskConfig task, Set<Integer> instanceIds);
 
   /**
    * Attempts to delete tasks from the task store.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4920a8b8/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index 6ad104b..085454c 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -18,7 +18,6 @@ import java.net.UnknownHostException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -40,6 +39,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
+
 import com.twitter.common.util.Clock;
 
 import org.apache.aurora.gen.AssignedTask;
@@ -49,6 +49,7 @@ import org.apache.aurora.gen.TaskEvent;
 import org.apache.aurora.scheduler.Driver;
 import org.apache.aurora.scheduler.TaskIdGenerator;
 import org.apache.aurora.scheduler.async.RescheduleCalculator;
+import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.EventSink;
@@ -115,21 +116,32 @@ public class StateManagerImpl implements StateManager {
   }
 
   @Override
-  public void insertPendingTasks(final Map<Integer, ITaskConfig> tasks) {
-    requireNonNull(tasks);
+  public void insertPendingTasks(final ITaskConfig task, final Set<Integer> instanceIds) {
+    requireNonNull(task);
+    checkNotBlank(instanceIds);
 
     // Done outside the write transaction to minimize the work done inside a transaction.
-    final Set<IScheduledTask> scheduledTasks = FluentIterable.from(tasks.entrySet())
-        .transform(new Function<Entry<Integer, ITaskConfig>, IScheduledTask>() {
+    final Set<IScheduledTask> scheduledTasks = FluentIterable.from(instanceIds)
+        .transform(new Function<Integer, IScheduledTask>() {
           @Override
-          public IScheduledTask apply(Entry<Integer, ITaskConfig> entry) {
-            return createTask(entry.getKey(), entry.getValue());
+          public IScheduledTask apply(Integer instanceId) {
+            return createTask(instanceId, task);
           }
         }).toSet();
 
     storage.write(new MutateWork.NoResult.Quiet() {
       @Override
       protected void execute(MutableStoreProvider storeProvider) {
+          ImmutableSet<IScheduledTask> existingTasks = storeProvider.getTaskStore().fetchTasks(
+            Query.jobScoped(JobKeys.from(task)).active());
+
+        Set<Integer> existingInstanceIds =
+            FluentIterable.from(existingTasks).transform(Tasks.SCHEDULED_TO_INSTANCE_ID).toSet();
+
+        if (!Sets.intersection(existingInstanceIds, instanceIds).isEmpty()) {
+          throw new IllegalArgumentException("Instance ID collision detected.");
+        }
+
         storeProvider.getUnsafeTaskStore().saveTasks(scheduledTasks);
 
         for (IScheduledTask task : scheduledTasks) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4920a8b8/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
index 2c712ef..54b9012 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
@@ -24,6 +24,7 @@ import org.apache.aurora.scheduler.MesosTaskFactory.MesosTaskFactoryImpl;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.state.MaintenanceController.MaintenanceControllerImpl;
 import org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl;
+import org.apache.aurora.scheduler.state.TaskLimitValidator.TaskLimitValidatorImpl;
 import org.apache.aurora.scheduler.state.UUIDGenerator.UUIDGeneratorImpl;
 
 /**
@@ -37,8 +38,6 @@ public class StateModule extends AbstractModule {
     bind(TaskAssignerImpl.class).in(Singleton.class);
     bind(MesosTaskFactory.class).to(MesosTaskFactoryImpl.class);
 
-    bind(SchedulerCore.class).to(SchedulerCoreImpl.class).in(Singleton.class);
-
     bind(StateManager.class).to(StateManagerImpl.class);
     bind(StateManagerImpl.class).in(Singleton.class);
 
@@ -46,6 +45,8 @@ public class StateModule extends AbstractModule {
     bind(UUIDGeneratorImpl.class).in(Singleton.class);
     bind(LockManager.class).to(LockManagerImpl.class);
     bind(LockManagerImpl.class).in(Singleton.class);
+    bind(TaskLimitValidator.class).to(TaskLimitValidatorImpl.class);
+    bind(TaskLimitValidatorImpl.class).in(Singleton.class);
 
     bindMaintenanceController(binder());
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4920a8b8/src/main/java/org/apache/aurora/scheduler/state/TaskLimitValidator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskLimitValidator.java b/src/main/java/org/apache/aurora/scheduler/state/TaskLimitValidator.java
new file mode 100644
index 0000000..779e925
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskLimitValidator.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.state;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.args.constraints.Positive;
+
+import org.apache.aurora.scheduler.TaskIdGenerator;
+import org.apache.aurora.scheduler.quota.QuotaCheckResult;
+import org.apache.aurora.scheduler.quota.QuotaManager;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.INSUFFICIENT_QUOTA;
+
+/**
+ * Validates task-specific requirements including name, count and quota checks.
+ */
+public interface TaskLimitValidator {
+
+  /**
+   * Validates adding {@code instances} of {@code task} does not violate certain task/job limits.
+   * <p>
+   * Validated rules:
+   * <ul>
+   *   <li>Max task ID length</li>
+   *   <li>Max number of tasks per job</li>
+   *   <li>Role resource quota</li>
+   * </ul>
+   *
+   * @param task Task configuration.
+   * @param newInstances Number of new task instances.
+   * @throws {@link TaskValidationException} If validation fails.
+   */
+  void validateTaskLimits(ITaskConfig task, int newInstances) throws TaskValidationException;
+
+  /**
+   * Thrown when task fails validation.
+   */
+  class TaskValidationException extends Exception {
+    public TaskValidationException(String msg) {
+      super(msg);
+    }
+  }
+
+  class TaskLimitValidatorImpl implements TaskLimitValidator {
+
+    @Positive
+    @CmdLine(name = "max_tasks_per_job", help = "Maximum number of allowed tasks in a single job.")
+    public static final Arg<Integer> MAX_TASKS_PER_JOB = Arg.create(4000);
+
+    // This number is derived from the maximum file name length limit on most UNIX systems, less
+    // the number of characters we've observed being added by mesos for the executor ID, prefix, and
+    // delimiters.
+    @VisibleForTesting
+    static final int MAX_TASK_ID_LENGTH = 255 - 90;
+
+    private final TaskIdGenerator taskIdGenerator;
+    private final QuotaManager quotaManager;
+
+    @Inject
+    TaskLimitValidatorImpl(TaskIdGenerator taskIdGenerator, QuotaManager quotaManager) {
+      this.taskIdGenerator = requireNonNull(taskIdGenerator);
+      this.quotaManager = requireNonNull(quotaManager);
+    }
+
+    @Override
+    public void validateTaskLimits(ITaskConfig task, int newInstances)
+        throws TaskValidationException {
+
+      // TODO(maximk): This is a short-term hack to stop the bleeding from
+      //               https://issues.apache.org/jira/browse/MESOS-691
+      if (taskIdGenerator.generate(task, newInstances).length() > MAX_TASK_ID_LENGTH) {
+        throw new TaskValidationException(
+            "Task ID is too long, please shorten your role or job name.");
+      }
+
+      // TODO(maximk): This check must consider ALL existing tasks not just the new instances.
+      if (newInstances > MAX_TASKS_PER_JOB.get()) {
+        throw new TaskValidationException("Job exceeds task limit of " + MAX_TASKS_PER_JOB.get());
+      }
+
+      QuotaCheckResult quotaCheck = quotaManager.checkQuota(
+          ImmutableMap.<ITaskConfig, Integer>of(),
+          task,
+          newInstances);
+
+      if (quotaCheck.getResult() == INSUFFICIENT_QUOTA) {
+        throw new TaskValidationException("Insufficient resource quota: "
+            + quotaCheck.getDetails().or(""));
+      }
+
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4920a8b8/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index 9171179..a43e5d7 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -105,7 +105,6 @@ import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Jobs;
 import org.apache.aurora.scheduler.base.Numbers;
 import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.ScheduleException;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
@@ -123,8 +122,8 @@ import org.apache.aurora.scheduler.quota.QuotaManager.QuotaException;
 import org.apache.aurora.scheduler.state.LockManager;
 import org.apache.aurora.scheduler.state.LockManager.LockException;
 import org.apache.aurora.scheduler.state.MaintenanceController;
-import org.apache.aurora.scheduler.state.SchedulerCore;
 import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.state.TaskLimitValidator;
 import org.apache.aurora.scheduler.state.UUIDGenerator;
 import org.apache.aurora.scheduler.storage.JobStore;
 import org.apache.aurora.scheduler.storage.Storage;
@@ -169,6 +168,7 @@ import static org.apache.aurora.gen.ResponseCode.OK;
 import static org.apache.aurora.gen.ResponseCode.WARNING;
 import static org.apache.aurora.gen.apiConstants.CURRENT_API_VERSION;
 import static org.apache.aurora.scheduler.base.Tasks.ACTIVE_STATES;
+import static org.apache.aurora.scheduler.state.TaskLimitValidator.TaskValidationException;
 import static org.apache.aurora.scheduler.thrift.Util.addMessage;
 import static org.apache.aurora.scheduler.thrift.Util.emptyResponse;
 
@@ -192,7 +192,6 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       Tasks.SCHEDULED_TO_INFO);
 
   private final NonVolatileStorage storage;
-  private final SchedulerCore schedulerCore;
   private final LockManager lockManager;
   private final CapabilityValidator sessionValidator;
   private final StorageBackup backup;
@@ -203,13 +202,13 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
   private final QuotaManager quotaManager;
   private final NearestFit nearestFit;
   private final StateManager stateManager;
+  private final TaskLimitValidator taskLimitValidator;
   private final UUIDGenerator uuidGenerator;
   private final JobUpdateController jobUpdateController;
 
   @Inject
   SchedulerThriftInterface(
       NonVolatileStorage storage,
-      SchedulerCore schedulerCore,
       LockManager lockManager,
       CapabilityValidator sessionValidator,
       StorageBackup backup,
@@ -220,11 +219,11 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       QuotaManager quotaManager,
       NearestFit nearestFit,
       StateManager stateManager,
+      TaskLimitValidator taskLimitValidator,
       UUIDGenerator uuidGenerator,
       JobUpdateController jobUpdateController) {
 
     this(storage,
-        schedulerCore,
         lockManager,
         sessionValidator,
         backup,
@@ -235,6 +234,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
         quotaManager,
         nearestFit,
         stateManager,
+        taskLimitValidator,
         uuidGenerator,
         jobUpdateController);
   }
@@ -242,7 +242,6 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
   @VisibleForTesting
   SchedulerThriftInterface(
       NonVolatileStorage storage,
-      SchedulerCore schedulerCore,
       LockManager lockManager,
       CapabilityValidator sessionValidator,
       StorageBackup backup,
@@ -253,11 +252,11 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       QuotaManager quotaManager,
       NearestFit nearestFit,
       StateManager stateManager,
+      TaskLimitValidator taskLimitValidator,
       UUIDGenerator uuidGenerator,
       JobUpdateController jobUpdateController) {
 
     this.storage = requireNonNull(storage);
-    this.schedulerCore = requireNonNull(schedulerCore);
     this.lockManager = requireNonNull(lockManager);
     this.sessionValidator = requireNonNull(sessionValidator);
     this.backup = requireNonNull(backup);
@@ -268,6 +267,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     this.quotaManager = requireNonNull(quotaManager);
     this.nearestFit = requireNonNull(nearestFit);
     this.stateManager = requireNonNull(stateManager);
+    this.taskLimitValidator = requireNonNull(taskLimitValidator);
     this.uuidGenerator = requireNonNull(uuidGenerator);
     this.jobUpdateController = requireNonNull(jobUpdateController);
   }
@@ -275,36 +275,65 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
   @Override
   public Response createJob(
       JobConfiguration mutableJob,
-      @Nullable Lock mutableLock,
+      @Nullable final Lock mutableLock,
       SessionKey session) {
 
-    IJobConfiguration job = IJobConfiguration.build(mutableJob);
-    IJobKey jobKey = JobKeys.assertValid(job.getKey());
     requireNonNull(session);
+    final Response response = emptyResponse();
 
-    Response response = Util.emptyResponse();
-
+    final SanitizedConfiguration sanitized;
     try {
-      sessionValidator.checkAuthenticated(session, ImmutableSet.of(job.getOwner().getRole()));
+      sessionValidator.checkAuthenticated(
+          session,
+          ImmutableSet.of(mutableJob.getOwner().getRole()));
+      sanitized = SanitizedConfiguration.fromUnsanitized(IJobConfiguration.build(mutableJob));
     } catch (AuthFailedException e) {
       return addMessage(response, AUTH_FAILED, e);
+    } catch (TaskDescriptionException e) {
+      return addMessage(response, INVALID_REQUEST, e);
     }
 
-    try {
-      SanitizedConfiguration sanitized = SanitizedConfiguration.fromUnsanitized(job);
+    return storage.write(new MutateWork.Quiet<Response>() {
+      @Override
+      public Response apply(MutableStoreProvider storeProvider) {
+        final IJobConfiguration job = sanitized.getJobConfig();
 
-      lockManager.validateIfLocked(
-          ILockKey.build(LockKey.job(jobKey.newBuilder())),
-          Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
+        try {
+          lockManager.validateIfLocked(
+              ILockKey.build(LockKey.job(job.getKey().newBuilder())),
+              Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
 
-      schedulerCore.createJob(sanitized);
-      response.setResponseCode(OK);
-    } catch (LockException e) {
-      addMessage(response, LOCK_ERROR, e);
-    } catch (TaskDescriptionException | ScheduleException e) {
-      addMessage(response, INVALID_REQUEST, e);
-    }
-    return response;
+          if (!storeProvider.getTaskStore().fetchTasks(
+              Query.jobScoped(job.getKey()).active()).isEmpty()
+              || cronJobManager.hasJob(job.getKey())) {
+
+            return addMessage(
+                response,
+                INVALID_REQUEST,
+                "Job already exists: " + JobKeys.canonicalString(job.getKey()));
+          }
+
+          taskLimitValidator.validateTaskLimits(job.getTaskConfig(), job.getInstanceCount());
+
+          // TODO(mchucarroll): deprecate cron as a part of create/kill job.(AURORA-454)
+          if (sanitized.isCron()) {
+            LOG.warning("Deprecated behavior: scheduling job " + job.getKey()
+                + " with cron via createJob (AURORA_454)");
+            cronJobManager.createJob(SanitizedCronJob.from(sanitized));
+          } else {
+            LOG.info("Launching " + sanitized.getInstanceIds().size() + " tasks.");
+            stateManager.insertPendingTasks(
+                sanitized.getJobConfig().getTaskConfig(),
+                sanitized.getInstanceIds());
+          }
+          return response.setResponseCode(OK);
+        } catch (LockException e) {
+          return addMessage(response, LOCK_ERROR, e);
+        } catch (CronException | TaskValidationException e) {
+          return addMessage(response, INVALID_REQUEST, e);
+        }
+      }
+    });
   }
 
   @Override
@@ -430,7 +459,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
           SanitizedConfiguration.fromUnsanitized(IJobConfiguration.build(description));
 
       PopulateJobResult result = new PopulateJobResult()
-          .setPopulated(ITaskConfig.toBuildersSet(sanitized.getTaskConfigs().values()));
+          .setPopulated(ImmutableSet.of(sanitized.getJobConfig().getTaskConfig().newBuilder()));
 
       response.setResult(Result.populateJobResult(result));
       response.setResponseCode(OK);
@@ -1175,38 +1204,52 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
 
   @Override
   public Response addInstances(
-      AddInstancesConfig config,
-      @Nullable Lock mutableLock,
-      SessionKey session) {
+      final AddInstancesConfig config,
+      @Nullable final Lock mutableLock,
+      final SessionKey session) {
 
     requireNonNull(config);
     requireNonNull(session);
     checkNotBlank(config.getInstanceIds());
-    IJobKey jobKey = JobKeys.assertValid(IJobKey.build(config.getKey()));
+    final IJobKey jobKey = JobKeys.assertValid(IJobKey.build(config.getKey()));
+    final Response resp = emptyResponse();
 
-    Response resp = Util.emptyResponse();
+    final ITaskConfig task;
     try {
       sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
-      ITaskConfig task = ConfigurationManager.validateAndPopulate(
+      task = ConfigurationManager.validateAndPopulate(
           ITaskConfig.build(config.getTaskConfig()));
-
-      if (cronJobManager.hasJob(jobKey)) {
-        return addMessage(resp, INVALID_REQUEST, "Instances may not be added to cron jobs.");
-      }
-
-      lockManager.validateIfLocked(
-          ILockKey.build(LockKey.job(jobKey.newBuilder())),
-          Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
-
-      schedulerCore.addInstances(jobKey, ImmutableSet.copyOf(config.getInstanceIds()), task);
-      return resp.setResponseCode(OK);
     } catch (AuthFailedException e) {
       return addMessage(resp, AUTH_FAILED, e);
-    } catch (LockException e) {
-      return addMessage(resp, LOCK_ERROR, e);
-    } catch (TaskDescriptionException | ScheduleException e) {
+    } catch (TaskDescriptionException e) {
       return addMessage(resp, INVALID_REQUEST, e);
     }
+
+    return storage.write(new MutateWork.Quiet<Response>() {
+      @Override
+      public Response apply(MutableStoreProvider storeProvider) {
+        try {
+          if (cronJobManager.hasJob(jobKey)) {
+            return addMessage(resp, INVALID_REQUEST, "Instances may not be added to cron jobs.");
+          }
+
+          lockManager.validateIfLocked(
+              ILockKey.build(LockKey.job(jobKey.newBuilder())),
+              Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
+
+          Set<Integer> instanceIds = ImmutableSet.copyOf(config.getInstanceIds());
+          taskLimitValidator.validateTaskLimits(task, instanceIds.size());
+
+          stateManager.insertPendingTasks(task, instanceIds);
+
+          return resp.setResponseCode(OK);
+        } catch (LockException e) {
+          return addMessage(resp, LOCK_ERROR, e);
+        } catch (TaskValidationException | IllegalArgumentException e) {
+          return addMessage(resp, INVALID_REQUEST, e);
+        }
+      }
+    });
   }
 
   private String getRoleFromLockKey(ILockKey lockKey) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4920a8b8/src/main/thrift/org/apache/aurora/gen/api.thrift
----------------------------------------------------------------------
diff --git a/src/main/thrift/org/apache/aurora/gen/api.thrift b/src/main/thrift/org/apache/aurora/gen/api.thrift
index a78a4d8..c00f943 100644
--- a/src/main/thrift/org/apache/aurora/gen/api.thrift
+++ b/src/main/thrift/org/apache/aurora/gen/api.thrift
@@ -301,6 +301,7 @@ struct ConfigSummary {
 }
 
 struct PopulateJobResult {
+  // TODO(maxim): Convert to ITaskConfig as there is always a single element returned.
   1: set<TaskConfig> populated
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4920a8b8/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
index c7ae7db..b6b1bcb 100644
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
@@ -13,7 +13,7 @@
  */
 package org.apache.aurora.scheduler.cron.quartz;
 
-import java.util.Map;
+import java.util.Set;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
@@ -90,7 +90,9 @@ public class AuroraCronJobTest extends EasyMockTest {
 
   @Test
   public void testEmptyStorage() throws JobExecutionException {
-    stateManager.insertPendingTasks(EasyMock.<Map<Integer, ITaskConfig>>anyObject());
+    stateManager.insertPendingTasks(
+        EasyMock.<ITaskConfig>anyObject(),
+        EasyMock.<Set<Integer>>anyObject());
     expectLastCall().times(3);
 
     control.replay();
@@ -126,7 +128,9 @@ public class AuroraCronJobTest extends EasyMockTest {
         AuroraCronJob.KILL_AUDIT_MESSAGE))
         .andReturn(true);
     backoffHelper.doUntilSuccess(EasyMock.capture(capture));
-    stateManager.insertPendingTasks(EasyMock.<Map<Integer, ITaskConfig>>anyObject());
+    stateManager.insertPendingTasks(
+        EasyMock.<ITaskConfig>anyObject(),
+        EasyMock.<Set<Integer>>anyObject());
 
     control.replay();
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4920a8b8/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java b/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java
index 62dce07..e24de79 100644
--- a/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java
@@ -52,7 +52,6 @@ import org.apache.aurora.scheduler.cron.CronJobManager;
 import org.apache.aurora.scheduler.http.api.GsonMessageBodyHandler;
 import org.apache.aurora.scheduler.quota.QuotaManager;
 import org.apache.aurora.scheduler.state.LockManager;
-import org.apache.aurora.scheduler.state.SchedulerCore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IServerInfo;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
@@ -113,7 +112,6 @@ public abstract class JettyServerModuleTest extends EasyMockTest {
             bindMock(OfferQueue.class);
             bindMock(QuotaManager.class);
             bindMock(RescheduleCalculator.class);
-            bindMock(SchedulerCore.class);
             bindMock(TaskScheduler.class);
             bindMock(Thread.UncaughtExceptionHandler.class);
           }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4920a8b8/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
deleted file mode 100644
index b22b390..0000000
--- a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
+++ /dev/null
@@ -1,783 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Strings;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ContiguousSet;
-import com.google.common.collect.DiscreteDomain;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
-import com.twitter.common.collections.Pair;
-import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.testing.FakeClock;
-
-import org.apache.aurora.gen.Constraint;
-import org.apache.aurora.gen.ExecutorConfig;
-import org.apache.aurora.gen.Identity;
-import org.apache.aurora.gen.JobConfiguration;
-import org.apache.aurora.gen.JobKey;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.gen.TaskConstraint;
-import org.apache.aurora.gen.ValueConstraint;
-import org.apache.aurora.scheduler.Driver;
-import org.apache.aurora.scheduler.TaskIdGenerator;
-import org.apache.aurora.scheduler.async.RescheduleCalculator;
-import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.ScheduleException;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
-import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
-import org.apache.aurora.scheduler.cron.CronException;
-import org.apache.aurora.scheduler.cron.CronJobManager;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.quota.QuotaCheckResult;
-import org.apache.aurora.scheduler.quota.QuotaManager;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.StorageBackfill;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.entities.ITaskEvent;
-import org.easymock.EasyMock;
-import org.easymock.IExpectationSetters;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
-import static org.apache.aurora.gen.ScheduleStatus.FAILED;
-import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
-import static org.apache.aurora.gen.ScheduleStatus.KILLED;
-import static org.apache.aurora.gen.ScheduleStatus.LOST;
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
-import static org.apache.aurora.gen.ScheduleStatus.STARTING;
-import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE;
-import static org.apache.aurora.scheduler.configuration.ConfigurationManager.hostLimitConstraint;
-import static org.apache.aurora.scheduler.configuration.ConfigurationManager.validateAndPopulate;
-import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.INSUFFICIENT_QUOTA;
-import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.SUFFICIENT_QUOTA;
-import static org.easymock.EasyMock.anyInt;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Base integration test for the SchedulerCoreImpl, subclasses should supply a concrete Storage
- * system.
- */
-public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
-
-  private static final String ROLE_A = "Test_Role_A";
-  private static final String USER_A = "Test_User_A";
-  private static final Identity OWNER_A = new Identity(ROLE_A, USER_A);
-  private static final String ENV_A = "Test_Env_A";
-  private static final String JOB_A = "Test_Job_A";
-  private static final IJobKey KEY_A = JobKeys.from(ROLE_A, ENV_A, JOB_A);
-
-  private static final QuotaCheckResult ENOUGH_QUOTA = new QuotaCheckResult(SUFFICIENT_QUOTA);
-  private static final QuotaCheckResult NOT_ENOUGH_QUOTA = new QuotaCheckResult(INSUFFICIENT_QUOTA);
-
-  private Driver driver;
-  private StateManagerImpl stateManager;
-  private Storage storage;
-  private SchedulerCoreImpl scheduler;
-  private CronJobManager cronJobManager;
-  private FakeClock clock;
-  private EventSink eventSink;
-  private RescheduleCalculator rescheduleCalculator;
-  private QuotaManager quotaManager;
-
-  // TODO(William Farner): Set up explicit expectations for calls to generate task IDs.
-  private final AtomicLong idCounter = new AtomicLong();
-  private TaskIdGenerator taskIdGenerator = new TaskIdGenerator() {
-    @Override
-    public String generate(ITaskConfig input, int instanceId) {
-      return "task-" + idCounter.incrementAndGet();
-    }
-  };
-
-  @Before
-  public void setUp() throws Exception {
-    driver = createMock(Driver.class);
-    clock = new FakeClock();
-    eventSink = createMock(EventSink.class);
-    rescheduleCalculator = createMock(RescheduleCalculator.class);
-    cronJobManager = createMock(CronJobManager.class);
-    quotaManager = createMock(QuotaManager.class);
-
-    eventSink.post(EasyMock.<PubsubEvent>anyObject());
-    expectLastCall().anyTimes();
-
-    expect(quotaManager.checkQuota(
-        EasyMock.<Map<ITaskConfig, Integer>>anyObject(),
-        anyObject(ITaskConfig.class),
-        anyInt())).andStubReturn(ENOUGH_QUOTA);
-
-    expect(cronJobManager.hasJob(anyObject(IJobKey.class))).andStubReturn(false);
-  }
-
-  /**
-   * Subclasses should create the {@code Storage} implementation to be used by the
-   * {@link SchedulerCoreImpl} under test.
-   *
-   * @return the {@code Storage} for the SchedulerCoreImpl to use under tests
-   * @throws Exception if there is a problem creating the storage implementation
-   */
-  protected abstract Storage createStorage() throws Exception;
-
-  private void buildScheduler() throws Exception {
-    buildScheduler(createStorage());
-  }
-
-  // TODO(ksweeney): Use Guice to instantiate everything here.
-  private void buildScheduler(Storage newStorage) throws Exception {
-    this.storage = newStorage;
-    storage.write(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider storeProvider) {
-        StorageBackfill.backfill(storeProvider, clock);
-      }
-    });
-
-    stateManager = new StateManagerImpl(
-        storage,
-        clock,
-        driver,
-        taskIdGenerator,
-        eventSink,
-        rescheduleCalculator);
-    scheduler = new SchedulerCoreImpl(
-        storage,
-        cronJobManager,
-        stateManager,
-        taskIdGenerator,
-        quotaManager);
-  }
-
-  @Test
-  public void testCreateJob() throws Exception {
-    int numTasks = 10;
-
-    control.replay();
-    buildScheduler();
-
-    SanitizedConfiguration job = makeJob(KEY_A, numTasks);
-    scheduler.createJob(job);
-    assertTaskCount(numTasks);
-
-    Set<IScheduledTask> tasks = Storage.Util.consistentFetchTasks(storage, Query.jobScoped(KEY_A));
-    assertEquals(numTasks, tasks.size());
-    for (IScheduledTask state : tasks) {
-      assertEquals(PENDING, state.getStatus());
-      assertTrue(state.getAssignedTask().isSetTaskId());
-      assertFalse(state.getAssignedTask().isSetSlaveId());
-      assertEquals(
-          validateAndPopulate(job.getJobConfig()).getTaskConfig(),
-          state.getAssignedTask().getTask());
-    }
-    Set<Integer> expectedInstanceIds =
-        ContiguousSet.create(Range.closedOpen(0, numTasks), DiscreteDomain.integers());
-    assertEquals(
-        expectedInstanceIds,
-        FluentIterable.from(tasks).transform(Tasks.SCHEDULED_TO_INSTANCE_ID).toSet());
-  }
-
-  @Test
-  public void testCreateJobEmptyString() throws Exception {
-    // TODO(ksweeney): Deprecate this as part of AURORA-423.
-
-    control.replay();
-    buildScheduler();
-
-    SanitizedConfiguration job = SanitizedConfiguration.fromUnsanitized(
-        IJobConfiguration.build(makeJob(KEY_A, 1).getJobConfig().newBuilder().setCronSchedule("")));
-    scheduler.createJob(job);
-    assertTaskCount(1);
-  }
-
-  private static Constraint dedicatedConstraint(Set<String> values) {
-    return new Constraint(DEDICATED_ATTRIBUTE,
-        TaskConstraint.value(new ValueConstraint(false, values)));
-  }
-
-  @Test
-  public void testDedicatedJob() throws Exception {
-    control.replay();
-    buildScheduler();
-
-    TaskConfig newTask = nonProductionTask();
-    newTask.addToConstraints(dedicatedConstraint(ImmutableSet.of(ROLE_A)));
-    scheduler.createJob(makeJob(KEY_A, newTask));
-    assertEquals(PENDING, getOnlyTask(Query.jobScoped(KEY_A)).getStatus());
-  }
-
-  @Test
-  public void testDedicatedJobKey() throws Exception {
-    control.replay();
-    buildScheduler();
-
-    TaskConfig newTask = nonProductionTask();
-    newTask.addToConstraints(dedicatedConstraint(ImmutableSet.of(JobKeys.canonicalString(KEY_A))));
-    scheduler.createJob(makeJob(KEY_A, newTask));
-    assertEquals(PENDING, getOnlyTask(Query.jobScoped(KEY_A)).getStatus());
-  }
-
-  @Test
-  public void testDedicatedArbitrarySuffix() throws Exception {
-    control.replay();
-    buildScheduler();
-
-    TaskConfig newTask = nonProductionTask();
-    newTask.addToConstraints(dedicatedConstraint(ImmutableSet.of(ROLE_A + "/arbitrary")));
-    scheduler.createJob(makeJob(KEY_A, newTask, 1));
-    assertEquals(PENDING, getOnlyTask(Query.jobScoped(KEY_A)).getStatus());
-  }
-
-  @Test
-  public void testRejectsBadIdentifiers() throws Exception {
-    control.replay();
-    buildScheduler();
-
-    Identity validIdentity = new Identity("foo", "bar");
-    Identity[] invalidIdentities = {
-      new Identity().setRole("foo"),
-      new Identity("foo/", "bar"),
-      new Identity("foo", "&bar"),
-      new Identity().setUser("bar")
-    };
-
-    String validJob = "baz";
-    String[] invalidIdentifiers = {"&baz", "/baz", "baz&", ""};
-
-    for (Identity ident : invalidIdentities) {
-      for (String env : invalidIdentifiers) {
-        for (String job : invalidIdentifiers) {
-          // Subvert JobKeys.from to avoid IllegalArgumentExceptions.
-          expectRejected(ident, IJobKey.build(new JobKey()
-              .setRole(ident.getRole())
-              .setEnvironment(env)
-              .setName(job)));
-        }
-      }
-    }
-
-    for (String jobName : invalidIdentifiers) {
-      expectRejected(validIdentity, IJobKey.build(new JobKey()
-          .setRole(validIdentity.getRole())
-          .setEnvironment(validJob)
-          .setName(jobName)));
-    }
-
-    for (Identity ident : invalidIdentities) {
-      expectRejected(ident, KEY_A);
-    }
-  }
-
-  private void expectRejected(Identity identity, IJobKey jobKey) throws ScheduleException {
-    try {
-      scheduler.createJob(SanitizedConfiguration.fromUnsanitized(IJobConfiguration.build(
-          makeJob(jobKey, 1).getJobConfig().newBuilder().setOwner(identity))));
-      fail("Job owner/name should have been rejected.");
-    } catch (TaskDescriptionException e) {
-      // Expected.
-    }
-  }
-
-  @Test(expected = ScheduleException.class)
-  public void testCreateDuplicateJob() throws Exception {
-    control.replay();
-    buildScheduler();
-
-    scheduler.createJob(makeJob(KEY_A, 1));
-    assertTaskCount(1);
-
-    scheduler.createJob(makeJob(KEY_A, 1));
-  }
-
-  private IExpectationSetters<Long> expectTaskNotThrottled() {
-    return expect(rescheduleCalculator.getFlappingPenaltyMs(EasyMock.<IScheduledTask>anyObject()))
-        .andReturn(0L);
-  }
-
-  @Test
-  public void testServiceTasksRescheduled() throws Exception {
-    int numServiceTasks = 5;
-    IJobKey adhocKey = KEY_A;
-    IJobKey serviceKey = IJobKey.build(
-        adhocKey.newBuilder().setName(adhocKey.getName() + "service"));
-
-    expectTaskNotThrottled().times(numServiceTasks);
-    expectNoCronJob(adhocKey);
-    expectNoCronJob(serviceKey);
-
-    control.replay();
-    buildScheduler();
-
-    // Schedule 5 service and 5 non-service tasks.
-    scheduler.createJob(makeJob(adhocKey, numServiceTasks));
-    TaskConfig task = productionTask().setIsService(true);
-    scheduler.createJob(makeJob(serviceKey, task, 5));
-
-    assertEquals(10, getTasksByStatus(PENDING).size());
-    changeStatus(Query.roleScoped(ROLE_A), ASSIGNED, STARTING);
-    assertEquals(10, getTasksByStatus(STARTING).size());
-
-    changeStatus(Query.roleScoped(ROLE_A), RUNNING);
-    assertEquals(10, getTasksByStatus(RUNNING).size());
-
-    // Service tasks will move back into PENDING state after finishing.
-    changeStatus(Query.roleScoped(ROLE_A), FINISHED);
-    Set<IScheduledTask> newTasks = getTasksByStatus(PENDING);
-    assertEquals(5, newTasks.size());
-    for (IScheduledTask state : newTasks) {
-      assertEquals(
-          getTask(state.getAncestorId()).getAssignedTask().getInstanceId(),
-          state.getAssignedTask().getInstanceId());
-    }
-
-    assertEquals(10, getTasksByStatus(FINISHED).size());
-  }
-
-  @Test
-  public void testServiceTaskIgnoresMaxFailures() throws Exception {
-    int totalFailures = 10;
-
-    expectTaskNotThrottled().times(totalFailures);
-    expectNoCronJob(KEY_A);
-
-    control.replay();
-    buildScheduler();
-
-    int maxFailures = 5;
-
-    // Schedule a service task.
-    TaskConfig task = productionTask()
-        .setIsService(true)
-        .setMaxTaskFailures(maxFailures);
-    scheduler.createJob(makeJob(KEY_A, task, 1));
-    assertTaskCount(1);
-
-    // Fail the task more than maxFailures.
-    for (int i = 1; i <= totalFailures; i++) {
-      String taskId = Tasks.id(
-          getOnlyTask(Query.jobScoped(KEY_A).active()));
-
-      changeStatus(taskId, ASSIGNED, STARTING, RUNNING);
-      assertEquals(i - 1, getTask(taskId).getFailureCount());
-      changeStatus(taskId, FAILED);
-
-      assertTaskCount(i + 1);
-      IScheduledTask rescheduled = getOnlyTask(Query.unscoped().byStatus(PENDING));
-      assertEquals(i, rescheduled.getFailureCount());
-    }
-
-    assertEquals(totalFailures, getTasksByStatus(FAILED).size());
-    assertEquals(1, getTasksByStatus(PENDING).size());
-  }
-
-  @Test
-  public void testTaskRescheduleOnKill() throws Exception {
-    int numServiceTasks = 5;
-
-    expectTaskNotThrottled().times(numServiceTasks);
-
-    control.replay();
-    buildScheduler();
-
-    scheduler.createJob(makeJob(KEY_A, numServiceTasks));
-
-    assertEquals(5, getTasksByStatus(PENDING).size());
-    changeStatus(Query.roleScoped(ROLE_A), ASSIGNED, STARTING);
-    assertEquals(5, getTasksByStatus(STARTING).size());
-    changeStatus(Query.roleScoped(ROLE_A), RUNNING);
-    assertEquals(5, getTasksByStatus(RUNNING).size());
-
-    // All tasks will move back into PENDING state after getting KILLED.
-    changeStatus(Query.roleScoped(ROLE_A), KILLED);
-    Set<IScheduledTask> newTasks = getTasksByStatus(PENDING);
-    assertEquals(5, newTasks.size());
-    assertEquals(5, getTasksByStatus(KILLED).size());
-  }
-
-  @Test
-  public void testFailedTaskIncrementsFailureCount() throws Exception {
-    int maxFailures = 5;
-    expectTaskNotThrottled().times(maxFailures - 1);
-    expect(cronJobManager.hasJob(KEY_A)).andReturn(false);
-
-    control.replay();
-    buildScheduler();
-
-    TaskConfig task = productionTask().setMaxTaskFailures(maxFailures);
-    scheduler.createJob(makeJob(KEY_A, task, 1));
-    assertTaskCount(1);
-
-    assertEquals(1, getTasks(Query.jobScoped(KEY_A)).size());
-
-    for (int i = 1; i <= maxFailures; i++) {
-      String taskId = Tasks.id(getOnlyTask(
-          Query.jobScoped(KEY_A).active()));
-
-      changeStatus(taskId, ASSIGNED, STARTING, RUNNING);
-      assertEquals(i - 1, getTask(taskId).getFailureCount());
-      changeStatus(taskId, FAILED);
-
-      if (i != maxFailures) {
-        assertTaskCount(i + 1);
-        IScheduledTask rescheduled = getOnlyTask(Query.unscoped().byStatus(PENDING));
-        assertEquals(i, rescheduled.getFailureCount());
-      } else {
-        assertTaskCount(maxFailures);
-      }
-    }
-
-    assertEquals(maxFailures, getTasksByStatus(FAILED).size());
-    assertTrue(getTasksByStatus(PENDING).isEmpty());
-  }
-
-  @Test
-  public void testLostTaskRescheduled() throws Exception {
-    expectKillTask(2);
-    expectTaskNotThrottled().times(2);
-    expect(cronJobManager.hasJob(KEY_A)).andReturn(false);
-
-    control.replay();
-    buildScheduler();
-
-    scheduler.createJob(makeJob(KEY_A, 1));
-    assertTaskCount(1);
-
-    Set<IScheduledTask> tasks = Storage.Util.consistentFetchTasks(storage, Query.jobScoped(KEY_A));
-    String taskId = Tasks.id(getOnlyTask(Query.roleScoped(ROLE_A)));
-    assertEquals(1, tasks.size());
-
-    changeStatus(taskId, ASSIGNED, LOST);
-
-    String newTaskId = Tasks.id(getOnlyTask(Query.unscoped().byStatus(PENDING)));
-    assertFalse(newTaskId.equals(taskId));
-
-    changeStatus(newTaskId, ASSIGNED, LOST);
-    assertFalse(newTaskId.equals(Tasks.id(getOnlyTask(Query.unscoped().byStatus(PENDING)))));
-  }
-
-  @Test
-  public void testIsStrictlyJobScoped() throws Exception {
-    // TODO(Sathya): Remove this after adding a unit test for Query utility class.
-    control.replay();
-    assertTrue(Query.isSingleJobScoped(Query.jobScoped(KEY_A)));
-    assertFalse(Query.isSingleJobScoped(Query.jobScoped(KEY_A).byId("xyz")));
-  }
-
-  @Test
-  public void testAuditMessage() throws Exception {
-    control.replay();
-    buildScheduler();
-
-    scheduler.createJob(makeJob(KEY_A, 1));
-
-    String taskId = Tasks.id(getOnlyTask(Query.roleScoped(ROLE_A)));
-    changeStatus(taskId, ASSIGNED, STARTING);
-    changeStatus(taskId, FAILED, Optional.of("bad stuff happened"));
-
-    String hostname = getLocalHost();
-
-    Iterator<Pair<ScheduleStatus, String>> expectedEvents =
-        ImmutableList.<Pair<ScheduleStatus, String>>builder()
-            .add(Pair.<ScheduleStatus, String>of(PENDING, null))
-            .add(Pair.<ScheduleStatus, String>of(ASSIGNED, null))
-            .add(Pair.<ScheduleStatus, String>of(STARTING, null))
-            .add(Pair.<ScheduleStatus, String>of(FAILED, "bad stuff happened"))
-            .build()
-        .iterator();
-    for (ITaskEvent event : getTask(taskId).getTaskEvents()) {
-      Pair<ScheduleStatus, String> expected = expectedEvents.next();
-      assertEquals(expected.getFirst(), event.getStatus());
-      assertEquals(expected.getSecond(), event.getMessage());
-      assertEquals(hostname, event.getScheduler());
-    }
-  }
-
-  @Test
-  public void testTaskIdLimit() throws Exception {
-    taskIdGenerator = new TaskIdGenerator() {
-      @Override
-      public String generate(ITaskConfig input, int instanceCount) {
-        return Strings.repeat("a", SchedulerCoreImpl.MAX_TASK_ID_LENGTH);
-      }
-    };
-
-    expectNoCronJob(KEY_A);
-    control.replay();
-    buildScheduler();
-
-    scheduler.createJob(makeJob(KEY_A, 1));
-  }
-
-  @Test(expected = ScheduleException.class)
-  public void testRejectLongTaskId() throws Exception {
-    taskIdGenerator = new TaskIdGenerator() {
-      @Override
-      public String generate(ITaskConfig input, int instanceCount) {
-        return Strings.repeat("a", SchedulerCoreImpl.MAX_TASK_ID_LENGTH + 1);
-      }
-    };
-
-    control.replay();
-    buildScheduler();
-
-    scheduler.createJob(makeJob(KEY_A, 1));
-  }
-
-  @Test(expected = ScheduleException.class)
-  public void testFilterFailRejectsCreate() throws Exception {
-    SanitizedConfiguration job = makeJob(KEY_A, 1);
-    expect(quotaManager.checkQuota(
-        EasyMock.<Map<ITaskConfig, Integer>>anyObject(),
-        anyObject(ITaskConfig.class),
-        anyInt())).andReturn(NOT_ENOUGH_QUOTA);
-
-    control.replay();
-
-    buildScheduler();
-    scheduler.createJob(job);
-  }
-
-  @Test(expected = ScheduleException.class)
-  public void testFilterFailRejectsAddInstances() throws Exception {
-    IJobConfiguration job = makeJob(KEY_A, 1).getJobConfig();
-    expect(quotaManager.checkQuota(
-        EasyMock.<Map<ITaskConfig, Integer>>anyObject(),
-        anyObject(ITaskConfig.class),
-        anyInt())).andReturn(NOT_ENOUGH_QUOTA);
-
-    control.replay();
-
-    buildScheduler();
-    scheduler.addInstances(job.getKey(), ImmutableSet.of(1), job.getTaskConfig());
-  }
-
-  @Test(expected = ScheduleException.class)
-  public void testMaxJobCheckFailsForAddInstances() throws Exception {
-    IJobConfiguration job = makeJob(KEY_A, 1).getJobConfig();
-
-    control.replay();
-    buildScheduler();
-
-    scheduler.addInstances(
-        job.getKey(),
-        ContiguousSet.create(Range.closed(0, SchedulerCoreImpl.MAX_TASKS_PER_JOB.get()),
-            DiscreteDomain.integers()),
-        job.getTaskConfig());
-  }
-
-  @Test
-  public void testAddInstances() throws Exception {
-    TaskConfig existingTask = productionTask();
-    TaskConfig newTask = productionTask()
-        .setEnvironment(ENV_A)
-        .setJobName(KEY_A.getName())
-        .setOwner(OWNER_A);
-    ImmutableSet<Integer> instances = ImmutableSet.of(1);
-
-    control.replay();
-    buildScheduler();
-
-    scheduler.createJob(makeJob(KEY_A, existingTask, 1));
-
-    assertTaskCount(1);
-    scheduler.addInstances(KEY_A, instances, ITaskConfig.build(newTask));
-    assertTaskCount(2);
-  }
-
-  @Test
-  public void testAddInstancesNoExistingTasks() throws Exception {
-    TaskConfig newTask = productionTask()
-        .setEnvironment(ENV_A)
-        .setJobName(KEY_A.getName())
-        .setOwner(OWNER_A);
-
-    ImmutableSet<Integer> instances = ImmutableSet.of(1);
-
-    control.replay();
-    buildScheduler();
-
-    assertTaskCount(0);
-    scheduler.addInstances(KEY_A, instances, ITaskConfig.build(newTask));
-    assertTaskCount(1);
-  }
-
-  @Test(expected = ScheduleException.class)
-  public void testAddInstancesIdCollision() throws Exception {
-    TaskConfig existingTask = productionTask();
-    TaskConfig newTask = productionTask()
-        .setEnvironment(ENV_A)
-        .setJobName(KEY_A.getName())
-        .setOwner(OWNER_A);
-
-    ImmutableSet<Integer> instances = ImmutableSet.of(0);
-    expectNoCronJob(KEY_A);
-
-    control.replay();
-    buildScheduler();
-
-    scheduler.createJob(makeJob(KEY_A, existingTask, 1));
-
-    assertTaskCount(1);
-    scheduler.addInstances(KEY_A, instances, ITaskConfig.build(newTask));
-  }
-
-  private void expectNoCronJob(IJobKey jobKey) throws CronException {
-    expect(cronJobManager.hasJob(jobKey)).andReturn(false);
-  }
-
-  private static String getLocalHost() {
-    try {
-      return InetAddress.getLocalHost().getHostName();
-    } catch (UnknownHostException e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  // TODO(William Farner): Inject a task ID generation function into StateManager so that we can
-  //     expect specific task IDs to be killed here.
-  private void expectKillTask(int numTasks) {
-    driver.killTask(EasyMock.<String>anyObject());
-    expectLastCall().times(numTasks);
-  }
-
-  private void assertTaskCount(int numTasks) {
-    assertEquals(numTasks, Storage.Util.consistentFetchTasks(storage, Query.unscoped()).size());
-  }
-
-  private static Identity makeIdentity(String role) {
-    return new Identity().setRole(role).setUser(USER_A);
-  }
-
-  private static Identity makeIdentity(JobKey jobKey) {
-    return makeIdentity(jobKey.getRole());
-  }
-
-  private static SanitizedConfiguration makeJob(IJobKey jobKey, int numDefaultTasks)
-      throws TaskDescriptionException  {
-
-    return makeJob(jobKey, productionTask(), numDefaultTasks);
-  }
-
-  private static SanitizedConfiguration makeJob(IJobKey jobKey, TaskConfig task)
-      throws TaskDescriptionException {
-
-    return makeJob(jobKey, task, 1);
-  }
-
-  private static SanitizedConfiguration makeJob(
-      IJobKey jobKey,
-      TaskConfig task,
-      int numTasks) throws TaskDescriptionException  {
-
-    JobConfiguration job = new JobConfiguration()
-        .setOwner(makeIdentity(jobKey.newBuilder()))
-        .setKey(jobKey.newBuilder())
-        .setInstanceCount(numTasks)
-        .setTaskConfig(new TaskConfig(task)
-          .setOwner(makeIdentity(jobKey.newBuilder()))
-          .setEnvironment(jobKey.getEnvironment())
-          .setJobName(jobKey.getName()));
-    return SanitizedConfiguration.fromUnsanitized(IJobConfiguration.build(job));
-  }
-
-  private static TaskConfig defaultTask(boolean production) {
-    return new TaskConfig()
-        .setNumCpus(1)
-        .setRamMb(1024)
-        .setDiskMb(1024)
-        .setProduction(production)
-        .setExecutorConfig(new ExecutorConfig("aurora", "thermos"))
-        // Avoid per-host scheduling constraints.
-        .setConstraints(Sets.newHashSet(hostLimitConstraint(100)))
-        .setContactEmail("testing@twitter.com");
-  }
-
-  private static TaskConfig productionTask() {
-    return defaultTask(true);
-  }
-
-  private static TaskConfig nonProductionTask() {
-    return defaultTask(false);
-  }
-
-  private IScheduledTask getTask(String taskId) {
-    return getOnlyTask(Query.taskScoped(taskId));
-  }
-
-  private IScheduledTask getOnlyTask(Query.Builder query) {
-    return Iterables.getOnlyElement(Storage.Util.consistentFetchTasks(storage, query));
-  }
-
-  private Set<IScheduledTask> getTasks(Query.Builder query) {
-    return Storage.Util.consistentFetchTasks(storage, query);
-  }
-
-  private Set<IScheduledTask> getTasksByStatus(ScheduleStatus status) {
-    return Storage.Util.consistentFetchTasks(storage, Query.unscoped().byStatus(status));
-  }
-
-  public void changeStatus(
-      Query.Builder query,
-      ScheduleStatus status,
-      Optional<String> message) {
-
-    for (String taskId : Tasks.ids(Storage.Util.consistentFetchTasks(storage, query))) {
-      stateManager.changeState(taskId, Optional.<ScheduleStatus>absent(), status, message);
-    }
-  }
-
-  public void changeStatus(Query.Builder query, ScheduleStatus status, ScheduleStatus... statuses) {
-    for (ScheduleStatus nextStatus
-        : ImmutableList.<ScheduleStatus>builder().add(status).add(statuses).build()) {
-
-      changeStatus(query, nextStatus, Optional.<String>absent());
-    }
-  }
-
-  public void changeStatus(String taskId, ScheduleStatus status, ScheduleStatus... statuses) {
-    changeStatus(Query.taskScoped(taskId), status, statuses);
-  }
-
-  public void changeStatus(String taskId, ScheduleStatus status, Optional<String> message) {
-    changeStatus(Query.taskScoped(taskId), status, message);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4920a8b8/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
index 1678411..cdd29ea 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
@@ -36,6 +36,7 @@ import org.apache.aurora.gen.TaskEvent;
 import org.apache.aurora.scheduler.Driver;
 import org.apache.aurora.scheduler.TaskIdGenerator;
 import org.apache.aurora.scheduler.async.RescheduleCalculator;
+import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.EventSink;
@@ -43,6 +44,7 @@ import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
 import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.storage.mem.MemStorage;
@@ -76,6 +78,7 @@ public class StateManagerImplTest extends EasyMockTest {
   private static final String HOST_A = "host_a";
   private static final Identity JIM = new Identity("jim", "jim-user");
   private static final String MY_JOB = "myJob";
+  private static final IJobKey JOB_KEY = JobKeys.from(JIM.getRole(), DEFAULT_ENVIRONMENT, MY_JOB);
 
   private Driver driver;
   private TaskIdGenerator taskIdGenerator;
@@ -466,6 +469,27 @@ public class StateManagerImplTest extends EasyMockTest {
     assertEquals(ImmutableMap.of("one", 86), actual.getAssignedTask().getAssignedPorts());
   }
 
+  @Test(expected = IllegalArgumentException.class)
+  public void insertEmptyPendingInstancesFails() {
+    control.replay();
+    stateManager.insertPendingTasks(makeTask(JIM, MY_JOB), ImmutableSet.<Integer>of());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void insertPendingInstancesInstanceCollision() {
+    ITaskConfig task = makeTask(JIM, MY_JOB);
+    String taskId = "a";
+    expect(taskIdGenerator.generate(task, 0)).andReturn(taskId).times(2);
+    expectStateTransitions(taskId, INIT, PENDING);
+
+    control.replay();
+
+    insertTask(task, 0);
+    Iterables.getOnlyElement(Storage.Util.consistentFetchTasks(storage, Query.taskScoped(taskId)));
+
+    insertTask(task, 0);
+  }
+
   private void expectStateTransitions(
       String taskId,
       ScheduleStatus initial,
@@ -489,7 +513,7 @@ public class StateManagerImplTest extends EasyMockTest {
   }
 
   private void insertTask(ITaskConfig task, int instanceId) {
-    stateManager.insertPendingTasks(ImmutableMap.of(instanceId, task));
+    stateManager.insertPendingTasks(task, ImmutableSet.of(instanceId));
   }
 
   private boolean changeState(String taskId, ScheduleStatus status) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4920a8b8/src/test/java/org/apache/aurora/scheduler/state/TaskLimitValidatorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskLimitValidatorTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskLimitValidatorTest.java
new file mode 100644
index 0000000..8f18617
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskLimitValidatorTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.state;
+
+import java.util.Map;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.TaskIdGenerator;
+import org.apache.aurora.scheduler.quota.QuotaCheckResult;
+import org.apache.aurora.scheduler.quota.QuotaManager;
+import org.apache.aurora.scheduler.state.TaskLimitValidator.TaskLimitValidatorImpl;
+import org.apache.aurora.scheduler.state.TaskLimitValidator.TaskValidationException;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.apiConstants.DEFAULT_ENVIRONMENT;
+import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.INSUFFICIENT_QUOTA;
+import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.SUFFICIENT_QUOTA;
+import static org.easymock.EasyMock.anyInt;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.expect;
+
+public class TaskLimitValidatorTest extends EasyMockTest {
+  private static final Identity JIM = new Identity("jim", "jim-user");
+  private static final String MY_JOB = "myJob";
+  private static final String TASK_ID = "a";
+
+  private static final QuotaCheckResult ENOUGH_QUOTA = new QuotaCheckResult(SUFFICIENT_QUOTA);
+  private static final QuotaCheckResult NOT_ENOUGH_QUOTA = new QuotaCheckResult(INSUFFICIENT_QUOTA);
+
+  private TaskIdGenerator taskIdGenerator;
+  private QuotaManager quotaManager;
+  private TaskLimitValidatorImpl taskLimitValidator;
+
+  @Before
+  public void setUp() {
+    taskIdGenerator = createMock(TaskIdGenerator.class);
+    quotaManager = createMock(QuotaManager.class);
+    taskLimitValidator = new TaskLimitValidatorImpl(taskIdGenerator, quotaManager);
+  }
+
+  @Test
+  public void testValidateTask() throws Exception {
+    ITaskConfig task = makeTask(JIM, MY_JOB);
+    expect(taskIdGenerator.generate(task, 1)).andReturn(TASK_ID);
+    expect(quotaManager.checkQuota(
+        EasyMock.<Map<ITaskConfig, Integer>>anyObject(),
+        anyObject(ITaskConfig.class),
+        anyInt())).andStubReturn(ENOUGH_QUOTA);
+
+    control.replay();
+
+    taskLimitValidator.validateTaskLimits(task, 1);
+  }
+
+  @Test(expected = TaskValidationException.class)
+  public void testValidatesFailsTaskIdTooLong() throws Exception {
+    ITaskConfig task = makeTask(JIM, MY_JOB);
+    expect(taskIdGenerator.generate(task, 1))
+        .andReturn(Strings.repeat(TASK_ID, TaskLimitValidatorImpl.MAX_TASK_ID_LENGTH + 1));
+
+    control.replay();
+
+    taskLimitValidator.validateTaskLimits(task, 1);
+  }
+
+  @Test(expected = TaskValidationException.class)
+  public void testValidatesFailsTooManyInstances() throws Exception {
+    ITaskConfig task = makeTask(JIM, MY_JOB);
+    expect(taskIdGenerator.generate(task, TaskLimitValidatorImpl.MAX_TASKS_PER_JOB.get() + 1))
+        .andReturn(TASK_ID);
+
+    control.replay();
+
+    taskLimitValidator.validateTaskLimits(task, TaskLimitValidatorImpl.MAX_TASKS_PER_JOB.get() + 1);
+  }
+
+  @Test(expected = TaskValidationException.class)
+  public void testValidatesFailsQuotaCheck() throws Exception {
+    ITaskConfig task = makeTask(JIM, MY_JOB);
+    expect(taskIdGenerator.generate(task, 1)).andReturn(TASK_ID);
+    expect(quotaManager.checkQuota(
+        EasyMock.<Map<ITaskConfig, Integer>>anyObject(),
+        anyObject(ITaskConfig.class),
+        anyInt())).andStubReturn(NOT_ENOUGH_QUOTA);
+
+    control.replay();
+
+    taskLimitValidator.validateTaskLimits(task, 1);
+  }
+
+  private static ITaskConfig makeTask(Identity owner, String job) {
+    return ITaskConfig.build(new TaskConfig()
+        .setOwner(owner)
+        .setEnvironment(DEFAULT_ENVIRONMENT)
+        .setJobName(job)
+        .setRequestedPorts(ImmutableSet.<String>of()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4920a8b8/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java b/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
index ad2548c..80646a6 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
@@ -106,18 +106,19 @@ public class StorageBackfillTest {
 
     final AtomicInteger taskId = new AtomicInteger();
 
-    SanitizedConfiguration job = makeJob(JOB_KEY, defaultTask(), 10);
+    final TaskConfig task = defaultTask();
+    SanitizedConfiguration job = makeJob(JOB_KEY, task, 10);
     final Set<IScheduledTask> badTasks = ImmutableSet.copyOf(Iterables.transform(
-        job.getTaskConfigs().values(),
-        new Function<ITaskConfig, IScheduledTask>() {
+        job.getInstanceIds(),
+        new Function<Integer, IScheduledTask>() {
           @Override
-          public IScheduledTask apply(ITaskConfig task) {
+          public IScheduledTask apply(Integer instanceId) {
             return IScheduledTask.build(new ScheduledTask()
                 .setStatus(RUNNING)
                 .setAssignedTask(new AssignedTask()
                     .setInstanceId(0)
                     .setTaskId("task-" + taskId.incrementAndGet())
-                    .setTask(task.newBuilder())));
+                    .setTask(task)));
           }
         }));
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4920a8b8/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageSchedulerCoreImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageSchedulerCoreImplTest.java b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageSchedulerCoreImplTest.java
deleted file mode 100644
index 35bed10..0000000
--- a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemStorageSchedulerCoreImplTest.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.mem;
-
-import org.apache.aurora.scheduler.state.BaseSchedulerCoreImplTest;
-import org.apache.aurora.scheduler.storage.Storage;
-
-public class MemStorageSchedulerCoreImplTest extends BaseSchedulerCoreImplTest {
-
-  @Override
-  protected Storage createStorage() {
-    return MemStorage.newEmptyStorage();
-  }
-}


Mime
View raw message