aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject [2/2] git commit: Implementing quota checking for async job updates.
Date Wed, 24 Sep 2014 21:27:10 GMT
Implementing quota checking for async job updates.

Bugs closed: AURORA-686

Reviewed at https://reviews.apache.org/r/25812/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/2e822238
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/2e822238
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/2e822238

Branch: refs/heads/master
Commit: 2e822238a77495997449d1190d07408be004413d
Parents: 2298914
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Wed Sep 24 14:26:47 2014 -0700
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Wed Sep 24 14:26:47 2014 -0700

----------------------------------------------------------------------
 .../configuration/ConfigurationManager.java     |   4 +-
 .../aurora/scheduler/quota/QuotaManager.java    | 181 ++++++----
 .../aurora/scheduler/quota/QuotaUtil.java       | 128 +++++++
 .../aurora/scheduler/state/StateModule.java     |   3 -
 .../scheduler/state/TaskLimitValidator.java     | 112 -------
 .../thrift/SchedulerThriftInterface.java        |  93 +++++-
 .../scheduler/updater/JobUpdateController.java  |  10 +
 .../updater/JobUpdateControllerImpl.java        |   8 +-
 .../thrift/org/apache/aurora/gen/api.thrift     |   6 +
 .../scheduler/quota/QuotaManagerImplTest.java   | 334 ++++++++++++++-----
 .../scheduler/state/TaskLimitValidatorTest.java | 117 -------
 .../storage/testing/StorageTestUtil.java        |   6 -
 .../thrift/SchedulerThriftInterfaceTest.java    | 207 +++++++++---
 .../aurora/scheduler/thrift/ThriftIT.java       |   4 +-
 14 files changed, 756 insertions(+), 457 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/2e822238/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
index 3661f84..8657421 100644
--- a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
@@ -232,11 +232,11 @@ public final class ConfigurationManager {
     }
 
     if (!job.isSetInstanceCount()) {
-      throw new TaskDescriptionException("Job configuration does not have shardCount set.");
+      throw new TaskDescriptionException("Job configuration does not have instanceCount set.");
     }
 
     if (job.getInstanceCount() <= 0) {
-      throw new TaskDescriptionException("Shard count must be positive.");
+      throw new TaskDescriptionException("Instance count must be positive.");
     }
 
     JobConfiguration builder = job.newBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/2e822238/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
index 14b0dd8..ea6b7d9 100644
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
@@ -13,23 +13,41 @@
  */
 package org.apache.aurora.scheduler.quota;
 
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableRangeSet;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.Sets;
 import com.google.inject.Inject;
 
+import org.apache.aurora.gen.JobUpdateQuery;
 import org.apache.aurora.gen.ResourceAggregate;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.ResourceAggregates;
 import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.Work;
+import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
+import org.apache.aurora.scheduler.storage.entities.IRange;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.updater.JobUpdateController;
 
 import static java.util.Objects.requireNonNull;
 
@@ -57,19 +75,14 @@ public interface QuotaManager {
   QuotaInfo getQuotaInfo(String role);
 
   /**
-   * Checks if there is enough resource quota available for adding {@code instances} of
-   * {@code template} tasks provided resources consumed by {@code releasedTemplates} tasks
-   * are released. The quota is defined at the task owner (role) level.
+   * Checks if there is enough resource quota available for adding production resources specified
+   * in {@code requestedProdResources}. The quota is defined at the task owner (role) level.
    *
-   * @param releasedTemplates Map of task resources to number of task instances to be released.
-   * @param newTemplate Single task resource requirements.
-   * @param instances Number of task instances.
+   * @param role Role to check quota for.
+   * @param requestedProdResources Additional production resources requested.
    * @return {@code QuotaComparisonResult} instance with quota check result details.
    */
-  QuotaCheckResult checkQuota(
-      Map<ITaskConfig, Integer> releasedTemplates,
-      ITaskConfig newTemplate,
-      int instances);
+  QuotaCheckResult checkQuota(String role, IResourceAggregate requestedProdResources);
 
   /**
    * Thrown when quota related operation failed.
@@ -116,13 +129,14 @@ public interface QuotaManager {
       return storage.consistentRead(new Work.Quiet<QuotaInfo>() {
         @Override
         public QuotaInfo apply(StoreProvider storeProvider) {
-          FluentIterable<ITaskConfig> tasks = FluentIterable
-              .from(storeProvider.getTaskStore().fetchTasks(Query.roleScoped(role).active()))
-              .transform(Tasks.SCHEDULED_TO_INFO);
+          FluentIterable<IScheduledTask> tasks = FluentIterable.from(
+              storeProvider.getTaskStore().fetchTasks(Query.roleScoped(role).active()));
 
-          IResourceAggregate prodConsumed = fromTasks(tasks.filter(Tasks.IS_PRODUCTION));
-          IResourceAggregate nonProdConsumed =
-              fromTasks(tasks.filter(Predicates.not(Tasks.IS_PRODUCTION)));
+          IResourceAggregate prodConsumed =
+              getProdConsumption(storeProvider.getJobUpdateStore(), role, tasks);
+
+          IResourceAggregate nonProdConsumed = QuotaUtil.fromTasks(
+              tasks.transform(Tasks.SCHEDULED_TO_INFO).filter(Predicates.not(Tasks.IS_PRODUCTION)));
 
           IResourceAggregate quota =
               storeProvider.getQuotaStore().fetchQuota(role).or(ResourceAggregates.none());
@@ -133,53 +147,107 @@ public interface QuotaManager {
     }
 
     @Override
-    public QuotaCheckResult checkQuota(
-        Map<ITaskConfig, Integer> releasedTemplates,
-        ITaskConfig newTemplate,
-        int instances) {
-
-      if (!newTemplate.isProduction()) {
+    public QuotaCheckResult checkQuota(String role, IResourceAggregate requestedProdResources) {
+      if (ResourceAggregates.EMPTY.equals(requestedProdResources)) {
         return new QuotaCheckResult(SUFFICIENT_QUOTA);
       }
 
-      QuotaInfo quotaInfo = getQuotaInfo(JobKeys.from(newTemplate).getRole());
+      QuotaInfo quotaInfo = getQuotaInfo(role);
+
+      return QuotaCheckResult.greaterOrEqual(
+          quotaInfo.getQuota(),
+          add(quotaInfo.getProdConsumption(), requestedProdResources));
+    }
+
+    private IResourceAggregate getProdConsumption(
+        JobUpdateStore jobUpdateStore,
+        String role,
+        FluentIterable<IScheduledTask> tasks) {
+
+      // The algorithm here is as follows:
+      // 1. Load all production active tasks that belong to jobs without active updates OR
+      //    unaffected by an active update working set. An example of the latter would be instances
+      //    not updated by the update due to being already in desired state or outside of update
+      //    range (e.g. not in JobUpdateInstructions.updateOnlyTheseInstances).
+      //    Calculate consumed resources as "nonUpdateConsumption".
+      //
+      // 2. Calculate consumed resources from instances affected by the active job updates as
+      //    "updateConsumption".
+      //
+      // 3. Add up the two to yield total prod consumption.
+
+      final Map<IJobKey, IJobUpdate> roleJobUpdates =
+          fetchActiveJobUpdates(jobUpdateStore, role).uniqueIndex(UPDATE_TO_JOB_KEY);
+
+      IResourceAggregate nonUpdateConsumption = QuotaUtil.prodResourcesFromTasks(tasks
+          .filter(buildNonUpdatingTasksFilter(roleJobUpdates))
+          .transform(Tasks.SCHEDULED_TO_INFO));
+
+      IResourceAggregate updateConsumption = ResourceAggregates.EMPTY;
+      for (IJobUpdate update : roleJobUpdates.values()) {
+        updateConsumption = add(updateConsumption, QuotaUtil.prodResourcesFromJobUpdate(update));
+      }
+
+      return add(nonUpdateConsumption, updateConsumption);
+    }
+
+    private static Predicate<IScheduledTask> buildNonUpdatingTasksFilter(
+        final Map<IJobKey, IJobUpdate> roleJobUpdates) {
 
-      // Calculate total additional requested resources.
-      IResourceAggregate additionalRequested =
-          ResourceAggregates.scale(fromTasks(ImmutableSet.of(newTemplate)), instances);
+      return new Predicate<IScheduledTask>() {
+        @Override
+        public boolean apply(IScheduledTask input) {
+          Optional<IJobUpdate> update = Optional.fromNullable(
+              roleJobUpdates.get(JobKeys.from(input.getAssignedTask().getTask())));
+
+          if (update.isPresent()) {
+            IInstanceTaskConfig configs =
+                update.get().getInstructions().getDesiredState();
+            RangeSet<Integer> desiredInstances = rangesToRangeSet(configs.getInstances());
 
-      // Calculate total released resources (i.e. resources freed up from removed instances).
-      IResourceAggregate totalReleased = ResourceAggregates.EMPTY;
-      for (Map.Entry<ITaskConfig, Integer> entry : releasedTemplates.entrySet()) {
-        ITaskConfig released = entry.getKey();
-        if (released.isProduction()) {
-          totalReleased = add(
-              totalReleased,
-              ResourceAggregates.scale(fromTasks(ImmutableSet.of(released)), entry.getValue()));
+            return !desiredInstances.contains(input.getAssignedTask().getInstanceId());
+          }
+          return true;
         }
+      };
+    }
+
+    private static final Function<IJobUpdate, IJobKey> UPDATE_TO_JOB_KEY =
+        new Function<IJobUpdate, IJobKey>() {
+          @Override
+          public IJobKey apply(IJobUpdate input) {
+            return input.getSummary().getJobKey();
+          }
+        };
+
+    private static FluentIterable<IJobUpdate> fetchActiveJobUpdates(
+        JobUpdateStore jobUpdateStore,
+        String role) {
+
+      List<IJobUpdateSummary> summaries = jobUpdateStore.fetchJobUpdateSummaries(updateQuery(role));
+
+      Set<IJobUpdate> updates = Sets.newHashSet();
+      for (IJobUpdateSummary summary : summaries) {
+        updates.add(jobUpdateStore.fetchJobUpdate(summary.getUpdateId()).get());
       }
 
-      // Subtract released resources from additional requested (this is the overall net change) and
-      // compare the result against available production quota.
-      return QuotaCheckResult.greaterOrEqual(
-          quotaInfo.getQuota(),
-          add(quotaInfo.getProdConsumption(), subtract(additionalRequested, totalReleased)));
+      return FluentIterable.from(updates);
+    }
+
+    @VisibleForTesting
+    static IJobUpdateQuery updateQuery(String role) {
+      return IJobUpdateQuery.build(new JobUpdateQuery()
+          .setRole(role)
+          .setUpdateStatuses(JobUpdateController.ACTIVE_JOB_UPDATE_STATES));
     }
 
-    private static IResourceAggregate fromTasks(Iterable<ITaskConfig> tasks) {
-      double cpu = 0;
-      int ramMb = 0;
-      int diskMb = 0;
-      for (ITaskConfig task : tasks) {
-        cpu += task.getNumCpus();
-        ramMb += task.getRamMb();
-        diskMb += task.getDiskMb();
+    private static RangeSet<Integer> rangesToRangeSet(Set<IRange> ranges) {
+      ImmutableRangeSet.Builder<Integer> builder = ImmutableRangeSet.builder();
+      for (IRange range : ranges) {
+        builder.add(Range.closed(range.getFirst(), range.getLast()));
       }
 
-      return IResourceAggregate.build(new ResourceAggregate()
-          .setNumCpus(cpu)
-          .setRamMb(ramMb)
-          .setDiskMb(diskMb));
+      return builder.build();
     }
 
     private static IResourceAggregate add(IResourceAggregate a, IResourceAggregate b) {
@@ -188,12 +256,5 @@ public interface QuotaManager {
           .setRamMb(a.getRamMb() + b.getRamMb())
           .setDiskMb(a.getDiskMb() + b.getDiskMb()));
     }
-
-    private static IResourceAggregate subtract(IResourceAggregate a, IResourceAggregate b) {
-      return IResourceAggregate.build(new ResourceAggregate()
-          .setNumCpus(a.getNumCpus() - b.getNumCpus())
-          .setRamMb(a.getRamMb() - b.getRamMb())
-          .setDiskMb(a.getDiskMb() - b.getDiskMb()));
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/2e822238/src/main/java/org/apache/aurora/scheduler/quota/QuotaUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaUtil.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaUtil.java
new file mode 100644
index 0000000..105426f
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaUtil.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.quota;
+
+import java.util.Set;
+
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.scheduler.base.ResourceAggregates;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
+import org.apache.aurora.scheduler.storage.entities.IRange;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+/**
+ * Static utility helpers for quota validation.
+ */
+public final class QuotaUtil {
+  private QuotaUtil() {
+    // Utility class.
+  }
+
+  /**
+   * Converts a set of production {@link ITaskConfig} templates into a {@link IResourceAggregate}.
+   * <p>
+   * Discards any templates with {@code isProduction = False}.
+   *
+   * @param tasks Set of tasks to convert.
+   * @return Aggregate resources consumed by {@code tasks}.
+   */
+  public static IResourceAggregate prodResourcesFromTasks(Iterable<ITaskConfig> tasks) {
+    return fromTasks(FluentIterable.from(tasks).filter(Tasks.IS_PRODUCTION));
+  }
+
+  /**
+   * Converts a {@link IJobUpdate} into a {@link IResourceAggregate}.
+   * <p>
+   * This function calculates max aggregate production resources consumed by the
+   * {@code jobUpdate}. The max is calculated between existing and desired task configs on per
+   * resource basis. This means max CPU, RAM and DISK values are computed individually and may
+   * come from different task configurations. While it may not be the most accurate representation
+   * of job update resources during the update, it does guarantee none of the individual resource
+   * values is exceeded during the forward/back roll.
+   *
+   * @param jobUpdate Job update to convert.
+   * @return Max production aggregate resources represented by the {@code jobUpdate}.
+   */
+  public static IResourceAggregate prodResourcesFromJobUpdate(IJobUpdate jobUpdate) {
+    IJobUpdateInstructions instructions = jobUpdate.getInstructions();
+
+    // Calculate existing prod task consumption.
+    double existingCpu = 0;
+    int existingRamMb = 0;
+    int existingDiskMb = 0;
+    for (IInstanceTaskConfig group : instructions.getInitialState()) {
+      ITaskConfig task = group.getTask();
+      if (task.isProduction()) {
+        for (IRange range : group.getInstances()) {
+          int numInstances = range.getLast() - range.getFirst() + 1;
+          existingCpu += task.getNumCpus() * numInstances;
+          existingRamMb += task.getRamMb() * numInstances;
+          existingDiskMb += task.getDiskMb() * numInstances;
+        }
+      }
+    }
+
+    // Calculate desired prod task consumption.
+    ITaskConfig desiredConfig = instructions.getDesiredState().getTask();
+    IResourceAggregate desired = desiredConfig.isProduction()
+        ? ResourceAggregates.scale(
+        prodResourcesFromTasks(ImmutableSet.of(desiredConfig)),
+        getUpdateInstanceCount(instructions.getDesiredState().getInstances()))
+        : ResourceAggregates.EMPTY;
+
+    // Calculate result as max(existing, desired) per resource.
+    return IResourceAggregate.build(new ResourceAggregate()
+        .setNumCpus(Math.max(existingCpu, desired.getNumCpus()))
+        .setRamMb(Math.max(existingRamMb, desired.getRamMb()))
+        .setDiskMb(Math.max(existingDiskMb, desired.getDiskMb())));
+  }
+
+  /**
+   * Converts a set of {@link ITaskConfig} templates into a {@link IResourceAggregate}.
+   * <p>
+   * @param tasks Set of tasks to convert.
+   * @return Aggregate resources consumed by {@code tasks}.
+   */
+  public static IResourceAggregate fromTasks(Iterable<ITaskConfig> tasks) {
+    double cpu = 0;
+    int ramMb = 0;
+    int diskMb = 0;
+    for (ITaskConfig task : tasks) {
+      cpu += task.getNumCpus();
+      ramMb += task.getRamMb();
+      diskMb += task.getDiskMb();
+    }
+
+    return IResourceAggregate.build(new ResourceAggregate()
+        .setNumCpus(cpu)
+        .setRamMb(ramMb)
+        .setDiskMb(diskMb));
+  }
+
+  private static int getUpdateInstanceCount(Set<IRange> ranges) {
+    int instanceCount = 0;
+    for (IRange range : ranges) {
+      instanceCount += range.getLast() - range.getFirst() + 1;
+    }
+
+    return instanceCount;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/2e822238/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
index 54b9012..fe16d3a 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
@@ -24,7 +24,6 @@ import org.apache.aurora.scheduler.MesosTaskFactory.MesosTaskFactoryImpl;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.state.MaintenanceController.MaintenanceControllerImpl;
 import org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl;
-import org.apache.aurora.scheduler.state.TaskLimitValidator.TaskLimitValidatorImpl;
 import org.apache.aurora.scheduler.state.UUIDGenerator.UUIDGeneratorImpl;
 
 /**
@@ -45,8 +44,6 @@ public class StateModule extends AbstractModule {
     bind(UUIDGeneratorImpl.class).in(Singleton.class);
     bind(LockManager.class).to(LockManagerImpl.class);
     bind(LockManagerImpl.class).in(Singleton.class);
-    bind(TaskLimitValidator.class).to(TaskLimitValidatorImpl.class);
-    bind(TaskLimitValidatorImpl.class).in(Singleton.class);
 
     bindMaintenanceController(binder());
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/2e822238/src/main/java/org/apache/aurora/scheduler/state/TaskLimitValidator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskLimitValidator.java b/src/main/java/org/apache/aurora/scheduler/state/TaskLimitValidator.java
deleted file mode 100644
index 7033373..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskLimitValidator.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.args.constraints.Positive;
-
-import org.apache.aurora.scheduler.TaskIdGenerator;
-import org.apache.aurora.scheduler.quota.QuotaCheckResult;
-import org.apache.aurora.scheduler.quota.QuotaManager;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-
-import static java.util.Objects.requireNonNull;
-
-import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.INSUFFICIENT_QUOTA;
-
-/**
- * Validates task-specific requirements including name, count and quota checks.
- */
-public interface TaskLimitValidator {
-
-  /**
-   * Validates adding {@code instances} of {@code task} does not violate certain task/job limits.
-   * <p>
-   * Validated rules:
-   * <ul>
-   *   <li>Max task ID length</li>
-   *   <li>Max number of tasks per job</li>
-   *   <li>Role resource quota</li>
-   * </ul>
-   *
-   * @param task Task configuration.
-   * @param newInstances Number of new task instances.
-   * @throws {@link TaskValidationException} If validation fails.
-   */
-  void validateTaskLimits(ITaskConfig task, int newInstances) throws TaskValidationException;
-
-  /**
-   * Thrown when task fails validation.
-   */
-  class TaskValidationException extends Exception {
-    public TaskValidationException(String msg) {
-      super(msg);
-    }
-  }
-
-  class TaskLimitValidatorImpl implements TaskLimitValidator {
-
-    @Positive
-    @CmdLine(name = "max_tasks_per_job", help = "Maximum number of allowed tasks in a single job.")
-    public static final Arg<Integer> MAX_TASKS_PER_JOB = Arg.create(4000);
-
-    // This number is derived from the maximum file name length limit on most UNIX systems, less
-    // the number of characters we've observed being added by mesos for the executor ID, prefix, and
-    // delimiters.
-    @VisibleForTesting
-    static final int MAX_TASK_ID_LENGTH = 255 - 90;
-
-    private final TaskIdGenerator taskIdGenerator;
-    private final QuotaManager quotaManager;
-
-    @Inject
-    TaskLimitValidatorImpl(TaskIdGenerator taskIdGenerator, QuotaManager quotaManager) {
-      this.taskIdGenerator = requireNonNull(taskIdGenerator);
-      this.quotaManager = requireNonNull(quotaManager);
-    }
-
-    @Override
-    public void validateTaskLimits(ITaskConfig task, int newInstances)
-        throws TaskValidationException {
-
-      // TODO(maximk): This is a short-term hack to stop the bleeding from
-      //               https://issues.apache.org/jira/browse/MESOS-691
-      if (taskIdGenerator.generate(task, newInstances).length() > MAX_TASK_ID_LENGTH) {
-        throw new TaskValidationException(
-            "Task ID is too long, please shorten your role or job name.");
-      }
-
-      // TODO(maximk): This check must consider ALL existing tasks not just the new instances.
-      if (newInstances > MAX_TASKS_PER_JOB.get()) {
-        throw new TaskValidationException("Job exceeds task limit of " + MAX_TASKS_PER_JOB.get());
-      }
-
-      QuotaCheckResult quotaCheck = quotaManager.checkQuota(
-          ImmutableMap.<ITaskConfig, Integer>of(),
-          task,
-          newInstances);
-
-      if (quotaCheck.getResult() == INSUFFICIENT_QUOTA) {
-        throw new TaskValidationException("Insufficient resource quota: "
-            + quotaCheck.getDetails().or(""));
-      }
-
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/2e822238/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index ae0320b..4602dbb 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -47,6 +47,9 @@ import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.args.constraints.Positive;
 
 import org.apache.aurora.auth.CapabilityValidator;
 import org.apache.aurora.auth.CapabilityValidator.AuditCheck;
@@ -104,10 +107,12 @@ import org.apache.aurora.gen.StartJobUpdateResult;
 import org.apache.aurora.gen.StartMaintenanceResult;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskQuery;
+import org.apache.aurora.scheduler.TaskIdGenerator;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Jobs;
 import org.apache.aurora.scheduler.base.Numbers;
 import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.ResourceAggregates;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
@@ -119,14 +124,15 @@ import org.apache.aurora.scheduler.cron.CrontabEntry;
 import org.apache.aurora.scheduler.cron.SanitizedCronJob;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.metadata.NearestFit;
+import org.apache.aurora.scheduler.quota.QuotaCheckResult;
 import org.apache.aurora.scheduler.quota.QuotaInfo;
 import org.apache.aurora.scheduler.quota.QuotaManager;
 import org.apache.aurora.scheduler.quota.QuotaManager.QuotaException;
+import org.apache.aurora.scheduler.quota.QuotaUtil;
 import org.apache.aurora.scheduler.state.LockManager;
 import org.apache.aurora.scheduler.state.LockManager.LockException;
 import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.state.TaskLimitValidator;
 import org.apache.aurora.scheduler.state.UUIDGenerator;
 import org.apache.aurora.scheduler.storage.JobStore;
 import org.apache.aurora.scheduler.storage.Storage;
@@ -176,7 +182,7 @@ import static org.apache.aurora.gen.ResponseCode.OK;
 import static org.apache.aurora.gen.ResponseCode.WARNING;
 import static org.apache.aurora.gen.apiConstants.CURRENT_API_VERSION;
 import static org.apache.aurora.scheduler.base.Tasks.ACTIVE_STATES;
-import static org.apache.aurora.scheduler.state.TaskLimitValidator.TaskValidationException;
+import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.INSUFFICIENT_QUOTA;
 import static org.apache.aurora.scheduler.thrift.Util.addMessage;
 import static org.apache.aurora.scheduler.thrift.Util.emptyResponse;
 
@@ -188,6 +194,16 @@ import static org.apache.aurora.scheduler.thrift.Util.emptyResponse;
  */
 @DecoratedThrift
 class SchedulerThriftInterface implements AuroraAdmin.Iface {
+  @Positive
+  @CmdLine(name = "max_tasks_per_job", help = "Maximum number of allowed tasks in a single job.")
+  public static final Arg<Integer> MAX_TASKS_PER_JOB = Arg.create(4000);
+
+  // This number is derived from the maximum file name length limit on most UNIX systems, less
+  // the number of characters we've observed being added by mesos for the executor ID, prefix, and
+  // delimiters.
+  @VisibleForTesting
+  static final int MAX_TASK_ID_LENGTH = 255 - 90;
+
   private static final Logger LOG = Logger.getLogger(SchedulerThriftInterface.class.getName());
 
   private static final Function<IScheduledTask, String> GET_ROLE = Functions.compose(
@@ -210,7 +226,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
   private final QuotaManager quotaManager;
   private final NearestFit nearestFit;
   private final StateManager stateManager;
-  private final TaskLimitValidator taskLimitValidator;
+  private final TaskIdGenerator taskIdGenerator;
   private final UUIDGenerator uuidGenerator;
   private final JobUpdateController jobUpdateController;
   private final boolean isUpdaterEnabled;
@@ -232,7 +248,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       QuotaManager quotaManager,
       NearestFit nearestFit,
       StateManager stateManager,
-      TaskLimitValidator taskLimitValidator,
+      TaskIdGenerator taskIdGenerator,
       UUIDGenerator uuidGenerator,
       JobUpdateController jobUpdateController,
       @EnableUpdater boolean isUpdaterEnabled) {
@@ -248,7 +264,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     this.quotaManager = requireNonNull(quotaManager);
     this.nearestFit = requireNonNull(nearestFit);
     this.stateManager = requireNonNull(stateManager);
-    this.taskLimitValidator = requireNonNull(taskLimitValidator);
+    this.taskIdGenerator = requireNonNull(taskIdGenerator);
     this.uuidGenerator = requireNonNull(uuidGenerator);
     this.jobUpdateController = requireNonNull(jobUpdateController);
     this.isUpdaterEnabled = isUpdaterEnabled;
@@ -295,7 +311,12 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
                 "Job already exists: " + JobKeys.canonicalString(job.getKey()));
           }
 
-          taskLimitValidator.validateTaskLimits(job.getTaskConfig(), job.getInstanceCount());
+          ITaskConfig taskConfig = sanitized.getJobConfig().getTaskConfig();
+          int instanceCount = sanitized.getInstanceIds().size();
+
+          validateTaskLimits(taskConfig, instanceCount, ResourceAggregates.scale(
+              QuotaUtil.prodResourcesFromTasks(ImmutableSet.of(taskConfig)),
+              instanceCount));
 
           // TODO(mchucarroll): deprecate cron as a part of create/kill job.(AURORA-454)
           if (sanitized.isCron()) {
@@ -303,10 +324,8 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
                 + " with cron via createJob (AURORA_454)");
             cronJobManager.createJob(SanitizedCronJob.from(sanitized));
           } else {
-            LOG.info("Launching " + sanitized.getInstanceIds().size() + " tasks.");
-            stateManager.insertPendingTasks(
-                sanitized.getJobConfig().getTaskConfig(),
-                sanitized.getInstanceIds());
+            LOG.info("Launching " + instanceCount + " tasks.");
+            stateManager.insertPendingTasks(taskConfig, sanitized.getInstanceIds());
           }
           return response.setResponseCode(OK);
         } catch (LockException e) {
@@ -1220,11 +1239,17 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
               ILockKey.build(LockKey.job(jobKey.newBuilder())),
               Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
 
-          Set<Integer> instanceIds = ImmutableSet.copyOf(config.getInstanceIds());
-          taskLimitValidator.validateTaskLimits(task, instanceIds.size());
+          ImmutableSet<IScheduledTask> currentTasks = storeProvider.getTaskStore().fetchTasks(
+              Query.jobScoped(JobKeys.from(task)).active());
 
-          stateManager.insertPendingTasks(task, instanceIds);
+          validateTaskLimits(
+              task,
+              currentTasks.size() + config.getInstanceIdsSize(),
+              ResourceAggregates.scale(
+                  QuotaUtil.prodResourcesFromTasks(ImmutableSet.of(task)),
+                  config.getInstanceIdsSize()));
 
+          stateManager.insertPendingTasks(task, ImmutableSet.copyOf(config.getInstanceIds()));
           return resp.setResponseCode(OK);
         } catch (LockException e) {
           return addMessage(resp, LOCK_ERROR, e);
@@ -1301,6 +1326,39 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
         new GetLocksResult().setLocks(ILock.toBuildersSet(lockManager.getLocks()))));
   }
 
+  private static class TaskValidationException extends Exception {
+    public TaskValidationException(String message) {
+      super(message);
+    }
+  }
+
+  private void validateTaskLimits(
+      ITaskConfig task,
+      int totalInstances,
+      IResourceAggregate requestedProdResources) throws TaskValidationException {
+
+    if (totalInstances <= 0 || totalInstances > MAX_TASKS_PER_JOB.get()) {
+      throw new TaskValidationException(String.format(
+          "Instance count must be between 1 and %d inclusive.",
+          MAX_TASKS_PER_JOB.get()));
+    }
+
+    // TODO(maximk): This is a short-term hack to stop the bleeding from
+    //               https://issues.apache.org/jira/browse/MESOS-691
+    if (taskIdGenerator.generate(task, totalInstances).length() > MAX_TASK_ID_LENGTH) {
+      throw new TaskValidationException(
+          "Task ID is too long, please shorten your role or job name.");
+    }
+
+    QuotaCheckResult quotaCheck =
+        quotaManager.checkQuota(task.getOwner().getRole(), requestedProdResources);
+
+    if (quotaCheck.getResult() == INSUFFICIENT_QUOTA) {
+      throw new TaskValidationException("Insufficient resource quota: "
+          + quotaCheck.getDetails().or(""));
+    }
+  }
+
   private static final Function<Collection<Integer>, Set<Range<Integer>>> TO_RANGES  =
       new Function<Collection<Integer>, Set<Range<Integer>>>() {
         @Override
@@ -1384,8 +1442,6 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     return storage.write(new MutateWork.Quiet<Response>() {
       @Override
       public Response apply(MutableStoreProvider storeProvider) {
-        // TODO(maxim): Wire in task limits and quota checks from SchedulerCore.
-
         String updateId = uuidGenerator.createNew().toString();
 
         IJobUpdate update = IJobUpdate.build(new JobUpdate()
@@ -1403,9 +1459,14 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
                         .setLast(request.getInstanceCount() - 1))))));
 
         try {
+          validateTaskLimits(
+              request.getTaskConfig(),
+              request.getInstanceCount(),
+              QuotaUtil.prodResourcesFromJobUpdate(update));
+
           jobUpdateController.start(update, context.getIdentity());
           return okResponse(Result.startJobUpdateResult(new StartJobUpdateResult(updateId)));
-        } catch (UpdateStateException | UpdateConfigurationException e) {
+        } catch (TaskValidationException | UpdateStateException | UpdateConfigurationException e) {
           return addMessage(response, INVALID_REQUEST, e);
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/2e822238/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
index 8f9e6d6..3bf81e6 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
@@ -13,6 +13,10 @@
  */
 package org.apache.aurora.scheduler.updater;
 
+import java.util.EnumSet;
+
+import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.apiConstants;
 import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
@@ -24,6 +28,12 @@ import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 public interface JobUpdateController {
 
   /**
+   * Different states that an active job update may be in.
+   */
+  EnumSet<JobUpdateStatus> ACTIVE_JOB_UPDATE_STATES =
+      EnumSet.copyOf(apiConstants.ACTIVE_JOB_UPDATE_STATES);
+
+  /**
    * Initiates an update.
    *
    * @param update Instructions for what job to update, and how to update it.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/2e822238/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index 348bdbf..7efcaa4 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -69,8 +69,6 @@ import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_BACK;
 import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_FORWARD;
 import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_BACK;
 import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD;
-import static org.apache.aurora.gen.JobUpdateStatus.ROLL_BACK_PAUSED;
-import static org.apache.aurora.gen.JobUpdateStatus.ROLL_FORWARD_PAUSED;
 import static org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.ACTIVE_QUERY;
 import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.MonitorAction;
@@ -251,11 +249,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
   static IJobUpdateQuery queryByJob(IJobKey job) {
     return IJobUpdateQuery.build(new JobUpdateQuery()
         .setJobKey(job.newBuilder())
-        .setUpdateStatuses(ImmutableSet.of(
-            ROLLING_FORWARD,
-            ROLLING_BACK,
-            ROLL_FORWARD_PAUSED,
-            ROLL_BACK_PAUSED)));
+        .setUpdateStatuses(ACTIVE_JOB_UPDATE_STATES));
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/2e822238/src/main/thrift/org/apache/aurora/gen/api.thrift
----------------------------------------------------------------------
diff --git a/src/main/thrift/org/apache/aurora/gen/api.thrift b/src/main/thrift/org/apache/aurora/gen/api.thrift
index 2376a5e..7436a5a 100644
--- a/src/main/thrift/org/apache/aurora/gen/api.thrift
+++ b/src/main/thrift/org/apache/aurora/gen/api.thrift
@@ -543,6 +543,12 @@ enum JobUpdateStatus {
   FAILED = 8
 }
 
+/** States the job update can be in while still considered active. */
+const set<JobUpdateStatus> ACTIVE_JOB_UPDATE_STATES = [JobUpdateStatus.ROLLING_FORWARD,
+                                                       JobUpdateStatus.ROLLING_BACK,
+                                                       JobUpdateStatus.ROLL_FORWARD_PAUSED,
+                                                       JobUpdateStatus.ROLL_BACK_PAUSED]
+
 /** Job update actions that can be applied to job instances. */
 enum JobUpdateAction {
   /**

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/2e822238/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
index 8381e21..11cc2f6 100644
--- a/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
@@ -13,20 +13,33 @@
  */
 package org.apache.aurora.scheduler.quota;
 
+import java.util.List;
+
 import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
 import com.twitter.common.testing.easymock.EasyMockTest;
 
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.InstanceTaskConfig;
+import org.apache.aurora.gen.JobUpdate;
+import org.apache.aurora.gen.JobUpdateInstructions;
+import org.apache.aurora.gen.JobUpdateSummary;
+import org.apache.aurora.gen.Range;
 import org.apache.aurora.gen.ResourceAggregate;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.ResourceAggregates;
 import org.apache.aurora.scheduler.quota.QuotaManager.QuotaException;
 import org.apache.aurora.scheduler.quota.QuotaManager.QuotaManagerImpl;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
@@ -37,6 +50,7 @@ import org.junit.Test;
 
 import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.INSUFFICIENT_QUOTA;
 import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.SUFFICIENT_QUOTA;
+import static org.apache.aurora.scheduler.quota.QuotaManager.QuotaManagerImpl.updateQuery;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -44,6 +58,7 @@ import static org.junit.Assert.assertTrue;
 public class QuotaManagerImplTest extends EasyMockTest {
   private static final String ROLE = "test";
   private static final String ENV = "test_env";
+  private static final String JOB_NAME = "job";
   private static final IResourceAggregate QUOTA = IResourceAggregate.build(new ResourceAggregate()
       .setNumCpus(1.0)
       .setRamMb(100L)
@@ -51,152 +66,138 @@ public class QuotaManagerImplTest extends EasyMockTest {
   private static final Query.Builder ACTIVE_QUERY = Query.roleScoped(ROLE).active();
 
   private StorageTestUtil storageUtil;
+  private JobUpdateStore jobUpdateStore;
   private QuotaManagerImpl quotaManager;
 
   @Before
   public void setUp() throws Exception {
     storageUtil = new StorageTestUtil(this);
+    jobUpdateStore = storageUtil.jobUpdateStore;
     quotaManager = new QuotaManagerImpl(storageUtil.storage);
     storageUtil.expectOperations();
   }
 
   @Test
   public void testGetQuotaInfo() {
-    IScheduledTask prodTask = createTask("foo", "id1", 3, 3, 3, true);
-    IScheduledTask nonProdTask = createTask("bar", "id1", 2, 2, 2, false);
+    IScheduledTask prodTask = createProdTask("foo", 3, 3, 3);
+    IScheduledTask nonProdTask = createTask("bar", "id1", 2, 2, 2, false, 0);
     IResourceAggregate quota = IResourceAggregate.build(new ResourceAggregate(4, 4, 4));
 
     expectQuota(quota);
     expectTasks(prodTask, nonProdTask);
+    expectJobUpdates(createTaskConfig(1, 1, 1, true), createTaskConfig(1, 1, 1, true));
 
     control.replay();
 
     QuotaInfo quotaInfo = quotaManager.getQuotaInfo(ROLE);
     assertEquals(quota, quotaInfo.getQuota());
     assertEquals(
-        IResourceAggregate.build(new ResourceAggregate(3, 3, 3)), quotaInfo.getProdConsumption());
+        IResourceAggregate.build(new ResourceAggregate(4, 4, 4)), quotaInfo.getProdConsumption());
     assertEquals(
         IResourceAggregate.build(new ResourceAggregate(2, 2, 2)),
         quotaInfo.getNonProdConsumption());
   }
 
   @Test
-  public void testGetQuotaInfoNoTasks() {
+  public void testGetQuotaInfoPartialUpdate() {
+    IScheduledTask prodTask = createProdTask("foo", 3, 3, 3);
+    IScheduledTask updatingProdTask = createTask(JOB_NAME, "id1", 3, 3, 3, true, 1);
+    IScheduledTask updatingFilteredProdTask = createTask(JOB_NAME, "id0", 3, 3, 3, true, 0);
+    IScheduledTask nonProdTask = createTask("bar", "id1", 2, 2, 2, false, 0);
     IResourceAggregate quota = IResourceAggregate.build(new ResourceAggregate(4, 4, 4));
 
     expectQuota(quota);
-    expectNoTasks();
+    expectTasks(prodTask, updatingProdTask, updatingFilteredProdTask, nonProdTask);
+    expectJobUpdates(createTaskConfig(1, 1, 1, true), createTaskConfig(1, 1, 1, true));
 
     control.replay();
 
     QuotaInfo quotaInfo = quotaManager.getQuotaInfo(ROLE);
     assertEquals(quota, quotaInfo.getQuota());
-    assertEquals(ResourceAggregates.none(), quotaInfo.getProdConsumption());
-    assertEquals(ResourceAggregates.none(), quotaInfo.getNonProdConsumption());
-  }
 
-  @Test
-  public void testCheckQuotaPasses() {
-    expectQuota(IResourceAggregate.build(new ResourceAggregate(4, 4, 4)));
-    expectTasks(createTask("foo", "id1", 3, 3, 3, true));
-
-    control.replay();
-
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(
-        ImmutableMap.<ITaskConfig, Integer>of(),
-        createTaskConfig(1, 1, 1, true),
-        1);
+    // Expected consumption from: prodTask + updatingProdTask + job update.
+    assertEquals(
+        IResourceAggregate.build(new ResourceAggregate(7, 7, 7)), quotaInfo.getProdConsumption());
 
-    assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
+    assertEquals(
+        IResourceAggregate.build(new ResourceAggregate(2, 2, 2)),
+        quotaInfo.getNonProdConsumption());
   }
 
   @Test
-  public void testCheckQuotaWithReleasedPasses() {
-    expectQuota(IResourceAggregate.build(new ResourceAggregate(4, 4, 4)));
-    expectTasks(createTask("foo", "id1", 3, 3, 3, true));
+  public void testGetQuotaInfoNoTasksNoUpdates() {
+    IResourceAggregate quota = IResourceAggregate.build(new ResourceAggregate(4, 4, 4));
 
-    control.replay();
+    expectQuota(quota);
+    expectNoTasks();
+    expectNoJobUpdates();
 
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(
-        ImmutableMap.of(createTaskConfig(1, 1, 1, true), 1),
-        createTaskConfig(2, 2, 2, true),
-        1);
+    control.replay();
 
-    assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
+    QuotaInfo quotaInfo = quotaManager.getQuotaInfo(ROLE);
+    assertEquals(quota, quotaInfo.getQuota());
+    assertEquals(ResourceAggregates.none(), quotaInfo.getProdConsumption());
+    assertEquals(ResourceAggregates.none(), quotaInfo.getNonProdConsumption());
   }
 
   @Test
-  public void testCheckQuotaWithReleasedMoreThanAddedPasses() {
+  public void testCheckQuotaPasses() {
     expectQuota(IResourceAggregate.build(new ResourceAggregate(4, 4, 4)));
-    expectTasks(createTask("foo", "id1", 3, 3, 3, true));
+    expectTasks(createProdTask("foo", 2, 2, 2));
+    expectJobUpdates(createTaskConfig(1, 1, 1, true), createTaskConfig(1, 1, 1, true));
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(
-        ImmutableMap.of(createTaskConfig(3, 3, 3, true), 1),
-        createTaskConfig(1, 1, 1, true),
-        1);
-
+    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 1));
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
   }
 
   @Test
-  public void testCheckQuotaWithReleasedNonToProdPasses() {
+  public void testCheckQuotaPassesNoTasks() {
     expectQuota(IResourceAggregate.build(new ResourceAggregate(4, 4, 4)));
-    expectTasks(createTask("foo", "id1", 3, 3, 3, true));
+    expectNoTasks();
+    expectJobUpdates(createTaskConfig(1, 1, 1, true), createTaskConfig(1, 1, 1, true));
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(
-        ImmutableMap.of(createTaskConfig(1, 1, 1, false), 1),
-        createTaskConfig(1, 1, 1, true),
-        1);
-
+    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 1));
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
   }
 
   @Test
-  public void testCheckQuotaWithReleasedNonToProdFails() {
+  public void testCheckQuotaPassesNoUpdates() {
     expectQuota(IResourceAggregate.build(new ResourceAggregate(4, 4, 4)));
-    expectTasks(createTask("foo", "id1", 3, 3, 3, true));
+    expectTasks(createProdTask("foo", 2, 2, 2));
+    expectNoJobUpdates();
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(
-        ImmutableMap.of(createTaskConfig(3, 3, 3, false), 1),
-        createTaskConfig(2, 2, 2, true),
-        1);
-
-    assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
+    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 1));
+    assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
   }
 
   @Test
-  public void testCheckQuotaPassesNoTasks() {
+  public void testCheckQuotaPassesNoTasksNoUpdates() {
     expectQuota(IResourceAggregate.build(new ResourceAggregate(4, 4, 4)));
     expectNoTasks();
+    expectNoJobUpdates();
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(
-        ImmutableMap.<ITaskConfig, Integer>of(),
-        createTaskConfig(1, 1, 1, true),
-        1);
-
+    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 1));
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
   }
 
   @Test
   public void testCheckQuotaPassesNonProdUnaccounted() {
     expectQuota(IResourceAggregate.build(new ResourceAggregate(4, 4, 4)));
-    expectTasks(createTask("foo", "id1", 3, 3, 3, true), createTask("bar", "id2", 5, 5, 5, false));
+    expectTasks(createProdTask("foo", 2, 2, 2), createTask("bar", "id2", 5, 5, 5, false, 0));
 
-    control.replay();
+    expectNoJobUpdates();
 
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(
-        ImmutableMap.<ITaskConfig, Integer>of(),
-        createTaskConfig(1, 1, 1, true),
-        1);
+    control.replay();
 
+    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 1));
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
   }
 
@@ -204,11 +205,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
   public void testCheckQuotaSkippedForNonProdRequest() {
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(
-        ImmutableMap.<ITaskConfig, Integer>of(),
-        createTaskConfig(1, 1, 1, false),
-        1);
-
+    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, ResourceAggregates.EMPTY);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
   }
 
@@ -218,28 +215,23 @@ public class QuotaManagerImplTest extends EasyMockTest {
         .andReturn(Optional.<IResourceAggregate>absent());
 
     expectNoTasks();
+    expectNoJobUpdates();
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(
-        ImmutableMap.<ITaskConfig, Integer>of(),
-        createTaskConfig(1, 1, 1, true),
-        1);
-
+    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 1));
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
   }
 
   @Test
   public void testCheckQuotaExceedsCpu() {
     expectQuota(IResourceAggregate.build(new ResourceAggregate(4, 4, 4)));
-    expectTasks(createTask("foo", "id1", 3, 3, 3, true));
+    expectTasks(createProdTask("foo", 3, 3, 3));
+    expectNoJobUpdates();
 
     control.replay();
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(
-        ImmutableMap.<ITaskConfig, Integer>of(),
-        createTaskConfig(2, 1, 1, true),
-        1);
 
+    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(2, 1, 1));
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
     assertTrue(checkQuota.getDetails().get().contains("CPU"));
   }
@@ -247,14 +239,12 @@ public class QuotaManagerImplTest extends EasyMockTest {
   @Test
   public void testCheckQuotaExceedsRam() {
     expectQuota(IResourceAggregate.build(new ResourceAggregate(4, 4, 4)));
-    expectTasks(createTask("foo", "id1", 3, 3, 3, true));
+    expectTasks(createProdTask("foo", 3, 3, 3));
+    expectNoJobUpdates();
 
     control.replay();
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(
-        ImmutableMap.<ITaskConfig, Integer>of(),
-        createTaskConfig(1, 2, 1, true),
-        1);
 
+    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 2, 1));
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
     assertTrue(checkQuota.getDetails().get().contains("RAM"));
   }
@@ -262,19 +252,127 @@ public class QuotaManagerImplTest extends EasyMockTest {
   @Test
   public void testCheckQuotaExceedsDisk() {
     expectQuota(IResourceAggregate.build(new ResourceAggregate(4, 4, 4)));
-    expectTasks(createTask("foo", "id1", 3, 3, 3, true));
+    expectTasks(createProdTask("foo", 3, 3, 3));
+    expectNoJobUpdates();
 
     control.replay();
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(
-        ImmutableMap.<ITaskConfig, Integer>of(),
-        createTaskConfig(1, 1, 2, true),
-        1);
 
+    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 2));
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
     assertTrue(checkQuota.getDetails().get().contains("DISK"));
   }
 
   @Test
+  public void testCheckQuotaUpdatingTasksFilteredOut() {
+    expectQuota(IResourceAggregate.build(new ResourceAggregate(5, 5, 5))).times(2);
+    expectTasks(createProdTask("foo", 2, 2, 2), createTask(JOB_NAME, "id2", 3, 3, 3, true, 0))
+        .times(2);
+
+    expectJobUpdates(createTaskConfig(1, 1, 1, true), createTaskConfig(2, 2, 2, true), 2);
+
+    control.replay();
+
+    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 1));
+    assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
+    assertEquals(
+        IResourceAggregate.build(new ResourceAggregate(4, 4, 4)),
+        quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+  }
+
+  @Test
+  public void testCheckQuotaNonProdUpdatesUnaccounted() {
+    expectQuota(IResourceAggregate.build(new ResourceAggregate(5, 5, 5))).times(2);
+    expectTasks(createProdTask("foo", 2, 2, 2), createProdTask("bar", 2, 2, 2)).times(2);
+
+    expectJobUpdates(createTaskConfig(8, 8, 8, false), createTaskConfig(4, 4, 4, false), 2);
+
+    control.replay();
+
+    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 1));
+    assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
+    assertEquals(
+        IResourceAggregate.build(new ResourceAggregate(4, 4, 4)),
+        quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+  }
+
+  @Test
+  public void testCheckQuotaProdToNonUpdateUnaccounted() {
+    expectQuota(IResourceAggregate.build(new ResourceAggregate(5, 5, 5))).times(2);
+    expectTasks(createProdTask("foo", 2, 2, 2), createProdTask("bar", 1, 1, 1)).times(2);
+
+    expectJobUpdates(createTaskConfig(1, 1, 1, true), createTaskConfig(4, 4, 4, false), 2);
+
+    control.replay();
+
+    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 1));
+    assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
+    assertEquals(
+        IResourceAggregate.build(new ResourceAggregate(4, 4, 4)),
+        quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+  }
+
+  @Test
+  public void testCheckQuotaNonToProdUpdateExceedsQuota() {
+    expectQuota(IResourceAggregate.build(new ResourceAggregate(5, 5, 5))).times(2);
+    expectTasks(createProdTask("foo", 2, 2, 2), createProdTask("bar", 2, 2, 2)).times(2);
+
+    expectJobUpdates(createTaskConfig(1, 1, 1, false), createTaskConfig(1, 1, 1, true), 2);
+
+    control.replay();
+
+    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 1));
+    assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
+    assertEquals(
+        IResourceAggregate.build(new ResourceAggregate(5, 5, 5)),
+        quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+  }
+
+  @Test
+  public void testCheckQuotaOldJobUpdateConfigMatters() {
+    expectQuota(IResourceAggregate.build(new ResourceAggregate(6, 6, 6))).times(2);
+    expectTasks(createProdTask("foo", 2, 2, 2), createProdTask("bar", 2, 2, 2)).times(2);
+    expectJobUpdates(createTaskConfig(2, 2, 2, true), createTaskConfig(1, 1, 1, true), 2);
+
+    control.replay();
+
+    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 1));
+    assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
+    assertEquals(
+        IResourceAggregate.build(new ResourceAggregate(6, 6, 6)),
+        quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+  }
+
+  @Test
+  public void testCheckQuotaUpdateAddsInstances() {
+    expectQuota(IResourceAggregate.build(new ResourceAggregate(6, 6, 6))).times(2);
+    expectTasks(createProdTask("foo", 2, 2, 2), createProdTask("bar", 2, 2, 2)).times(2);
+    expectJobUpdates(createTaskConfig(1, 1, 1, true), 1, createTaskConfig(1, 1, 1, true), 2, 2);
+
+    control.replay();
+
+    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 1));
+    assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
+    assertEquals(
+        IResourceAggregate.build(new ResourceAggregate(6, 6, 6)),
+        quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+  }
+
+  @Test
+  public void testCheckQuotaUpdateRemovesInstances() {
+    expectQuota(IResourceAggregate.build(new ResourceAggregate(6, 6, 6))).times(2);
+    expectTasks(createProdTask("foo", 2, 2, 2), createProdTask("bar", 2, 2, 2)).times(2);
+    expectJobUpdates(createTaskConfig(1, 1, 1, true), 2, createTaskConfig(1, 1, 1, true), 1, 2);
+
+    control.replay();
+
+    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 1));
+    assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
+    assertEquals(
+        IResourceAggregate.build(new ResourceAggregate(6, 6, 6)),
+        quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+  }
+
+  @Test
   public void testSaveQuotaPasses() throws Exception {
     storageUtil.quotaStore.saveQuota(ROLE, QUOTA);
 
@@ -298,6 +396,50 @@ public class QuotaManagerImplTest extends EasyMockTest {
     return storageUtil.expectTaskFetch(ACTIVE_QUERY, tasks);
   }
 
+  private void expectJobUpdates(ITaskConfig initial, ITaskConfig desired) {
+    expectJobUpdates(initial, 1, desired, 1, 1);
+  }
+
+  private void expectJobUpdates(ITaskConfig initial, ITaskConfig desired, int times) {
+    expectJobUpdates(initial, 1, desired, 1, times);
+  }
+
+  private void expectJobUpdates(
+      ITaskConfig initial,
+      int intialInstances,
+      ITaskConfig desired,
+      int desiredInstances,
+      int times) {
+
+    String updateId = "u1";
+    List<IJobUpdateSummary> summaries =
+        ImmutableList.of(IJobUpdateSummary.build(new JobUpdateSummary()
+            .setJobKey(JobKeys.from(initial).newBuilder())
+            .setUpdateId(updateId)));
+
+    IJobUpdate update = IJobUpdate.build(new JobUpdate()
+        .setSummary(summaries.get(0).newBuilder())
+        .setInstructions(new JobUpdateInstructions()
+            .setDesiredState(new InstanceTaskConfig()
+                .setTask(desired.newBuilder())
+                .setInstances(ImmutableSet.of(new Range(0, desiredInstances - 1))))
+            .setInitialState(ImmutableSet.of(new InstanceTaskConfig()
+                .setTask(initial.newBuilder())
+                .setInstances(ImmutableSet.of(new Range(0, intialInstances - 1)))))));
+
+    expect(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(initial.getOwner().getRole())))
+        .andReturn(summaries)
+        .times(times);
+
+    expect(jobUpdateStore.fetchJobUpdate(updateId)).andReturn(Optional.of(update)).times(times);
+
+  }
+
+  private void expectNoJobUpdates() {
+    expect(jobUpdateStore.fetchJobUpdateSummaries(
+        QuotaManagerImpl.updateQuery(ROLE))).andReturn(ImmutableList.<IJobUpdateSummary>of());
+  }
+
   private IExpectationSetters<?> expectNoTasks() {
     return expectTasks();
   }
@@ -307,25 +449,35 @@ public class QuotaManagerImplTest extends EasyMockTest {
         .andReturn(Optional.of(quota));
   }
 
+  private IResourceAggregate prodResource(double cpu, long ram, long disk) {
+    return IResourceAggregate.build(new ResourceAggregate(cpu, ram, disk));
+  }
+
   private ITaskConfig createTaskConfig(int cpus, int ramMb, int diskMb, boolean production) {
-    return createTask("newTask", "newId", cpus, ramMb, diskMb, production)
+    return createTask(JOB_NAME, "newId", cpus, ramMb, diskMb, production, 0)
         .getAssignedTask()
         .getTask();
   }
 
+  private IScheduledTask createProdTask(String jobName, int cpus, int ramMb, int diskMb) {
+    return createTask(jobName, jobName + "id1", cpus, ramMb, diskMb, true, 0);
+  }
+
   private IScheduledTask createTask(
       String jobName,
       String taskId,
       int cpus,
       int ramMb,
       int diskMb,
-      boolean production) {
+      boolean production,
+      int instanceId) {
 
     return IScheduledTask.build(new ScheduledTask()
         .setStatus(ScheduleStatus.RUNNING)
         .setAssignedTask(
             new AssignedTask()
                 .setTaskId(taskId)
+                .setInstanceId(instanceId)
                 .setTask(new TaskConfig()
                     .setOwner(new Identity(ROLE, ROLE))
                     .setEnvironment(ENV)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/2e822238/src/test/java/org/apache/aurora/scheduler/state/TaskLimitValidatorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskLimitValidatorTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskLimitValidatorTest.java
deleted file mode 100644
index 8f18617..0000000
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskLimitValidatorTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import java.util.Map;
-
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableSet;
-import com.twitter.common.testing.easymock.EasyMockTest;
-
-import org.apache.aurora.gen.Identity;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.TaskIdGenerator;
-import org.apache.aurora.scheduler.quota.QuotaCheckResult;
-import org.apache.aurora.scheduler.quota.QuotaManager;
-import org.apache.aurora.scheduler.state.TaskLimitValidator.TaskLimitValidatorImpl;
-import org.apache.aurora.scheduler.state.TaskLimitValidator.TaskValidationException;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.easymock.EasyMock;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.apiConstants.DEFAULT_ENVIRONMENT;
-import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.INSUFFICIENT_QUOTA;
-import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.SUFFICIENT_QUOTA;
-import static org.easymock.EasyMock.anyInt;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.expect;
-
-public class TaskLimitValidatorTest extends EasyMockTest {
-  private static final Identity JIM = new Identity("jim", "jim-user");
-  private static final String MY_JOB = "myJob";
-  private static final String TASK_ID = "a";
-
-  private static final QuotaCheckResult ENOUGH_QUOTA = new QuotaCheckResult(SUFFICIENT_QUOTA);
-  private static final QuotaCheckResult NOT_ENOUGH_QUOTA = new QuotaCheckResult(INSUFFICIENT_QUOTA);
-
-  private TaskIdGenerator taskIdGenerator;
-  private QuotaManager quotaManager;
-  private TaskLimitValidatorImpl taskLimitValidator;
-
-  @Before
-  public void setUp() {
-    taskIdGenerator = createMock(TaskIdGenerator.class);
-    quotaManager = createMock(QuotaManager.class);
-    taskLimitValidator = new TaskLimitValidatorImpl(taskIdGenerator, quotaManager);
-  }
-
-  @Test
-  public void testValidateTask() throws Exception {
-    ITaskConfig task = makeTask(JIM, MY_JOB);
-    expect(taskIdGenerator.generate(task, 1)).andReturn(TASK_ID);
-    expect(quotaManager.checkQuota(
-        EasyMock.<Map<ITaskConfig, Integer>>anyObject(),
-        anyObject(ITaskConfig.class),
-        anyInt())).andStubReturn(ENOUGH_QUOTA);
-
-    control.replay();
-
-    taskLimitValidator.validateTaskLimits(task, 1);
-  }
-
-  @Test(expected = TaskValidationException.class)
-  public void testValidatesFailsTaskIdTooLong() throws Exception {
-    ITaskConfig task = makeTask(JIM, MY_JOB);
-    expect(taskIdGenerator.generate(task, 1))
-        .andReturn(Strings.repeat(TASK_ID, TaskLimitValidatorImpl.MAX_TASK_ID_LENGTH + 1));
-
-    control.replay();
-
-    taskLimitValidator.validateTaskLimits(task, 1);
-  }
-
-  @Test(expected = TaskValidationException.class)
-  public void testValidatesFailsTooManyInstances() throws Exception {
-    ITaskConfig task = makeTask(JIM, MY_JOB);
-    expect(taskIdGenerator.generate(task, TaskLimitValidatorImpl.MAX_TASKS_PER_JOB.get() + 1))
-        .andReturn(TASK_ID);
-
-    control.replay();
-
-    taskLimitValidator.validateTaskLimits(task, TaskLimitValidatorImpl.MAX_TASKS_PER_JOB.get() + 1);
-  }
-
-  @Test(expected = TaskValidationException.class)
-  public void testValidatesFailsQuotaCheck() throws Exception {
-    ITaskConfig task = makeTask(JIM, MY_JOB);
-    expect(taskIdGenerator.generate(task, 1)).andReturn(TASK_ID);
-    expect(quotaManager.checkQuota(
-        EasyMock.<Map<ITaskConfig, Integer>>anyObject(),
-        anyObject(ITaskConfig.class),
-        anyInt())).andStubReturn(NOT_ENOUGH_QUOTA);
-
-    control.replay();
-
-    taskLimitValidator.validateTaskLimits(task, 1);
-  }
-
-  private static ITaskConfig makeTask(Identity owner, String job) {
-    return ITaskConfig.build(new TaskConfig()
-        .setOwner(owner)
-        .setEnvironment(DEFAULT_ENVIRONMENT)
-        .setJobName(job)
-        .setRequestedPorts(ImmutableSet.<String>of()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/2e822238/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java b/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
index 5aebbfb..6918cba 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
@@ -13,7 +13,6 @@
  */
 package org.apache.aurora.scheduler.storage.testing;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
@@ -30,7 +29,6 @@ import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.Work;
 import org.apache.aurora.scheduler.storage.TaskStore;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.easymock.Capture;
 import org.easymock.IAnswer;
@@ -141,8 +139,4 @@ public class StorageTestUtil {
   public IExpectationSetters<?> expectTaskFetch(Query.Builder query, IScheduledTask... result) {
     return expectTaskFetch(query, ImmutableSet.<IScheduledTask>builder().add(result).build());
   }
-
-  public IExpectationSetters<?> expectQuotaFetch(String role, Optional<IResourceAggregate> result) {
-    return expect(quotaStore.fetchQuota(role)).andReturn(result);
-  }
 }


Mime
View raw message