aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject [2/2] git commit: Client quota checks. Part 2: server side changes.
Date Wed, 15 Jan 2014 21:06:18 GMT
Client quota checks. Part 2: server side changes.

Reviewed at https://reviews.apache.org/r/16629/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/e2b357e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/e2b357e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/e2b357e7

Branch: refs/heads/master
Commit: e2b357e7fc8cfd29f703f9f9d6f7e1b84babf25b
Parents: a6ab7fd
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Wed Jan 15 13:04:54 2014 -0800
Committer: Maxim Khutornenko <mkhutornenko@twitter.com>
Committed: Wed Jan 15 13:04:54 2014 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/http/SchedulerzRole.java   |  24 +--
 .../scheduler/quota/QuotaCheckResult.java       | 117 +++++++++++
 .../scheduler/quota/QuotaComparisonResult.java  |  90 ---------
 .../aurora/scheduler/quota/QuotaFilter.java     |  85 --------
 .../aurora/scheduler/quota/QuotaInfo.java       |  62 ++++++
 .../aurora/scheduler/quota/QuotaManager.java    | 147 ++++++++++----
 .../aurora/scheduler/quota/QuotaModule.java     |   4 -
 .../apache/aurora/scheduler/quota/Quotas.java   |  90 ---------
 .../aurora/scheduler/state/JobFilter.java       | 134 -------------
 .../scheduler/state/SchedulerCoreImpl.java      |  96 +++++----
 .../aurora/scheduler/storage/Storage.java       |  18 --
 .../storage/testing/StorageTestUtil.java        |   6 +
 .../thrift/SchedulerThriftInterface.java        |  63 +++---
 .../scheduler/quota/QuotaCheckResultTest.java   |  88 +++++++++
 .../quota/QuotaComparisonResultTest.java        |  88 ---------
 .../aurora/scheduler/quota/QuotaFilterTest.java | 189 ------------------
 .../scheduler/quota/QuotaManagerImplTest.java   | 197 ++++++++++++-------
 .../state/BaseSchedulerCoreImplTest.java        |  39 ++--
 .../thrift/SchedulerThriftInterfaceTest.java    |  43 +++-
 .../aurora/scheduler/thrift/ThriftIT.java       |   7 +-
 20 files changed, 663 insertions(+), 924 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/main/java/org/apache/aurora/scheduler/http/SchedulerzRole.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/SchedulerzRole.java b/src/main/java/org/apache/aurora/scheduler/http/SchedulerzRole.java
index 785efd0..b0caca7 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/SchedulerzRole.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/SchedulerzRole.java
@@ -36,7 +36,6 @@ import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -58,13 +57,12 @@ import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.cron.CronPredictor;
+import org.apache.aurora.scheduler.quota.QuotaInfo;
 import org.apache.aurora.scheduler.quota.QuotaManager;
-import org.apache.aurora.scheduler.quota.Quotas;
 import org.apache.aurora.scheduler.state.CronJobManager;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IQuota;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
@@ -152,26 +150,14 @@ public class SchedulerzRole extends JerseyTemplateServlet {
         template.setAttribute("cronJobs", cronJobs.values());
 
         // TODO(Suman Karumuri): In future compute consumption for role and environment.
-        template.setAttribute("prodResourcesUsed", quotaManager.getConsumption(role.get()));
-        template.setAttribute("nonProdResourcesUsed", getNonProdConsumption(role.get()));
-        template.setAttribute("resourceQuota", getQuota(role.get()));
+        QuotaInfo quotaInfo = quotaManager.getQuotaInfo(role.get());
+        template.setAttribute("prodResourcesUsed", quotaInfo.prodConsumption());
+        template.setAttribute("nonProdResourcesUsed", quotaInfo.nonProdConsumption());
+        template.setAttribute("resourceQuota", quotaInfo.guota());
       }
     });
   }
 
-  private IQuota getQuota(final String role) {
-    return Storage.Util.consistentFetchQuota(storage, role).or(Quotas.noQuota());
-  }
-
-  private IQuota getNonProdConsumption(String role) {
-    FluentIterable<ITaskConfig> tasks = FluentIterable
-        .from(Storage.Util.weaklyConsistentFetchTasks(storage, Query.roleScoped(role).active()))
-        .transform(Tasks.SCHEDULED_TO_INFO)
-        .filter(Predicates.not(Tasks.IS_PRODUCTION));
-
-    return Quotas.fromTasks(tasks);
-  }
-
   /**
    * Display jobs for a role and environment.
    */

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/main/java/org/apache/aurora/scheduler/quota/QuotaCheckResult.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaCheckResult.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaCheckResult.java
new file mode 100644
index 0000000..c898535
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaCheckResult.java
@@ -0,0 +1,117 @@
+/**
+ * Copyright 2013 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.quota;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+
+import org.apache.aurora.scheduler.storage.entities.IQuota;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Calculates and formats detailed quota comparison result.
+ */
+public class QuotaCheckResult {
+
+  /**
+   * Quota check result.
+   */
+  public static enum Result {
+    /**
+     * There is sufficient quota for the requested operation.
+     */
+    SUFFICIENT_QUOTA,
+
+    /**
+     * There is not enough allocated quota for the requested operation.
+     */
+    INSUFFICIENT_QUOTA
+  }
+
+  private static enum Resource {
+    CPU("core(s)"),
+    RAM("MB"),
+    DISK("MB");
+
+    private final String unit;
+    private Resource(String unit) {
+      this.unit = unit;
+    }
+  }
+
+  private final Optional<String> details;
+  private final Result result;
+
+  @VisibleForTesting
+  public QuotaCheckResult(Result result) {
+    this(result, Optional.<String>absent());
+  }
+
+  private QuotaCheckResult(Result result, Optional<String> details) {
+    this.result = checkNotNull(result);
+    this.details = checkNotNull(details);
+  }
+
+  /**
+   * Gets quota check result.
+   *
+   * @return Quota check result.
+   */
+  public Result getResult() {
+    return result;
+  }
+
+  /**
+   * Gets detailed quota violation description in case quota check fails.
+   *
+   * @return Quota check details.
+   */
+  public Optional<String> getDetails() {
+    return details;
+  }
+
+  static QuotaCheckResult greaterOrEqual(IQuota a, IQuota b) {
+    StringBuilder details = new StringBuilder();
+    boolean result = compare(a.getNumCpus(), b.getNumCpus(), Resource.CPU, details)
+        & compare(a.getRamMb(), b.getRamMb(), Resource.RAM, details)
+        & compare(a.getDiskMb(), b.getDiskMb(), Resource.DISK, details);
+
+    return new QuotaCheckResult(
+        result ? Result.SUFFICIENT_QUOTA : Result.INSUFFICIENT_QUOTA,
+        Optional.of(details.toString()));
+  }
+
+  private static boolean compare(
+      double a,
+      double b,
+      Resource resource,
+      StringBuilder details) {
+
+    boolean result = a >= b;
+    if (!result) {
+      details
+          .append(details.length() > 0 ? "; " : "")
+          .append(resource)
+          .append(" quota exceeded by ")
+          .append(String.format("%.2f", b - a))
+          .append(" ")
+          .append(resource.unit);
+    }
+
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/main/java/org/apache/aurora/scheduler/quota/QuotaComparisonResult.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaComparisonResult.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaComparisonResult.java
deleted file mode 100644
index 99d2e4c..0000000
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaComparisonResult.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.quota;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.aurora.scheduler.storage.entities.IQuota;
-
-/**
- * Calculates and formats detailed quota comparison result.
- */
-class QuotaComparisonResult {
-
-  private enum Resource {
-    CPU("core(s)"),
-    RAM("MB"),
-    DISK("MB");
-
-    private final String unit;
-    private Resource(String unit) {
-      this.unit = unit;
-    }
-  }
-
-  enum Result {
-    SUFFICIENT_QUOTA,
-    INSUFFICIENT_QUOTA
-  }
-
-  private final String details;
-  private final Result result;
-
-  @VisibleForTesting
-  QuotaComparisonResult(Result result, String details) {
-    this.result = result;
-    this.details = details;
-  }
-
-  Result result() {
-    return result;
-  }
-
-  String details() {
-    return details;
-  }
-
-  static QuotaComparisonResult greaterOrEqual(IQuota a, IQuota b) {
-    StringBuilder details = new StringBuilder();
-    boolean result = compare(a.getNumCpus(), b.getNumCpus(), Resource.CPU, details)
-        & compare(a.getRamMb(), b.getRamMb(), Resource.RAM, details)
-        & compare(a.getDiskMb(), b.getDiskMb(), Resource.DISK, details);
-
-    return new QuotaComparisonResult(
-        result ? Result.SUFFICIENT_QUOTA : Result.INSUFFICIENT_QUOTA,
-        details.toString());
-  }
-
-  private static boolean compare(
-      double a,
-      double b,
-      Resource resource,
-      StringBuilder details) {
-
-    boolean result = a >= b;
-    if (!result) {
-      details
-          .append(details.length() > 0 ? "; " : "")
-          .append(resource)
-          .append(" quota exceeded by ")
-          .append(String.format("%.2f", b - a))
-          .append(" ")
-          .append(resource.unit);
-    }
-
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/main/java/org/apache/aurora/scheduler/quota/QuotaFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaFilter.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaFilter.java
deleted file mode 100644
index 6ab7982..0000000
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaFilter.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.quota;
-
-import javax.inject.Inject;
-
-import com.google.common.collect.Iterables;
-
-import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.quota.QuotaManager.QuotaManagerImpl;
-import org.apache.aurora.scheduler.state.JobFilter;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IQuota;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import static org.apache.aurora.scheduler.quota.QuotaComparisonResult.Result.INSUFFICIENT_QUOTA;
-
-/**
- * A filter that fails production jobs for roles that do not have sufficient quota to run them.
- */
-class QuotaFilter implements JobFilter {
-  private final QuotaManagerImpl quotaManager;
-  private final Storage storage;
-
-  @Inject
-  QuotaFilter(QuotaManagerImpl quotaManager, Storage storage) {
-    this.quotaManager = checkNotNull(quotaManager);
-    this.storage = checkNotNull(storage);
-  }
-
-  @Override
-  public JobFilterResult filter(final IJobConfiguration job) {
-    return filterByTask(job.getKey(), job.getTaskConfig(), job.getInstanceCount());
-  }
-
-  @Override
-  public JobFilterResult filter(ITaskConfig template, int instanceCount) {
-    return filterByTask(JobKeys.from(template), template, instanceCount);
-  }
-
-  private synchronized JobFilterResult filterByTask(
-      IJobKey jobKey,
-      ITaskConfig template,
-      int instanceCount) {
-
-    if (!template.isProduction()) {
-      return JobFilterResult.pass();
-    }
-
-    IQuota currentUsage = Quotas.fromProductionTasks(
-        Iterables.transform(
-            Storage.Util.consistentFetchTasks(storage, Query.jobScoped(jobKey).active()),
-            Tasks.SCHEDULED_TO_INFO));
-
-    IQuota additionalRequested =
-        Quotas.subtract(Quotas.fromTasks(template, instanceCount), currentUsage);
-    QuotaComparisonResult comparisonResult =
-        quotaManager.checkQuota(jobKey.getRole(), additionalRequested);
-
-    if (comparisonResult.result() == INSUFFICIENT_QUOTA) {
-      return JobFilterResult.fail("Insufficient resource quota: " + comparisonResult.details());
-    }
-
-    return JobFilterResult.pass();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/main/java/org/apache/aurora/scheduler/quota/QuotaInfo.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaInfo.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaInfo.java
new file mode 100644
index 0000000..0e286d8
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaInfo.java
@@ -0,0 +1,62 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.quota;
+
+import org.apache.aurora.scheduler.storage.entities.IQuota;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Wraps allocated quota and consumption details.
+ */
+public class QuotaInfo {
+  private final IQuota quota;
+  private final IQuota prodConsumption;
+  private final IQuota nonProdConsumption;
+
+  QuotaInfo(IQuota quota, IQuota prodConsumption, IQuota nonProdConsumption) {
+    this.quota = checkNotNull(quota);
+    this.prodConsumption = checkNotNull(prodConsumption);
+    this.nonProdConsumption = checkNotNull(nonProdConsumption);
+  }
+
+  /**
+   * Total quota available.
+   *
+   * @return Available quota.
+   */
+  public IQuota guota() {
+    return quota;
+  }
+
+  /**
+   * Quota consumed by production jobs.
+   *
+   * @return Production job consumption.
+   */
+  public IQuota prodConsumption() {
+    return prodConsumption;
+  }
+
+  /**
+   * Quota consumed by non-production jobs.
+   *
+   * @return Non production job consumption.
+   */
+  public IQuota nonProdConsumption() {
+    return nonProdConsumption;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
index 6b0645b..c93e57f 100644
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
@@ -15,76 +15,149 @@
  */
 package org.apache.aurora.scheduler.quota;
 
-import com.google.common.collect.Iterables;
+import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
 import com.google.inject.Inject;
 
+import org.apache.aurora.gen.Quota;
+import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.Work;
-import org.apache.aurora.scheduler.storage.Storage.Work.Quiet;
 import org.apache.aurora.scheduler.storage.entities.IQuota;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static com.twitter.common.base.MorePreconditions.checkNotBlank;
+
+import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.SUFFICIENT_QUOTA;
 
 /**
  * Allows access to resource quotas, and tracks quota consumption.
  */
 public interface QuotaManager {
   /**
-   * Fetches the current resource usage for the role.
+   * Saves a new quota for the provided role or overrides the existing one.
+   *
+   * @param role Quota owner.
+   * @param quota Quota to save.
+   * @throws QuotaException If provided quota specification is invalid.
+   */
+  void saveQuota(String role, IQuota quota) throws QuotaException;
+
+  /**
+   * Gets {@code QuotaInfo} for the specified role.
+   *
+   * @param role Quota owner.
+   * @return {@code QuotaInfo} instance.
+   */
+  QuotaInfo getQuotaInfo(String role);
+
+  /**
+   * Checks if there is enough resource quota available for adding {@code instances}
+   * of {@code template} tasks. The quota is defined at the task owner (role) level.
    *
-   * @param role to fetch quota usage for.
-   * @return Resource quota used by {@code role}.
+   * @param template Single task resource requirements.
+   * @param instances Number of task instances.
+   * @return {@code QuotaComparisonResult} instance with quota check result details.
    */
-  IQuota getConsumption(String role);
+  QuotaCheckResult checkQuota(ITaskConfig template, int instances);
+
+  /**
+   * Thrown when quota related operation failed.
+   */
+  class QuotaException extends Exception {
+    public QuotaException(String msg) {
+      super(msg);
+    }
+  }
 
   /**
    * Quota provider that stores quotas in the canonical {@link Storage} system.
    */
-  static class QuotaManagerImpl implements QuotaManager {
+  class QuotaManagerImpl implements QuotaManager {
     private final Storage storage;
 
     @Inject
-    public QuotaManagerImpl(Storage storage) {
+    QuotaManagerImpl(Storage storage) {
       this.storage = checkNotNull(storage);
     }
 
     @Override
-    public IQuota getConsumption(final String role) {
-      checkNotBlank(role);
-
-      final Query.Builder query = Query.roleScoped(role).active();
-
-      return storage.consistentRead(
-          new Work.Quiet<IQuota>() {
-            @Override public IQuota apply(StoreProvider storeProvider) {
-              return Quotas.fromProductionTasks(Iterables.transform(
-                  storeProvider.getTaskStore().fetchTasks(query), Tasks.SCHEDULED_TO_INFO));
-            }
-          });
+    public void saveQuota(final String ownerRole, final IQuota quota) throws QuotaException {
+      if (!quota.isSetNumCpus() || !quota.isSetRamMb() || !quota.isSetDiskMb()) {
+        throw new QuotaException("Missing quota specification(s) in: " + quota.toString());
+      }
+
+      if (quota.getNumCpus() < 0.0 || quota.getRamMb() < 0 || quota.getDiskMb() < 0) {
+        throw new QuotaException("Negative values in: " + quota.toString());
+      }
+
+      storage.write(new Storage.MutateWork.NoResult.Quiet() {
+        @Override protected void execute(Storage.MutableStoreProvider storeProvider) {
+          storeProvider.getQuotaStore().saveQuota(ownerRole, quota);
+        }
+      });
     }
 
-    /**
-     * Tests whether the role has at least the specified amount of quota available.
-     *
-     * @param role Role to consume quota for.
-     * @param quota Quota amount to check for availability.
-     * @return QuotaComparisonResult with {@code result()} returning {@code true} if the role
-     * currently has at least {@code quota} quota remaining, {@code false} otherwise.
-     */
-    QuotaComparisonResult checkQuota(final String role, final IQuota quota) {
-      checkNotBlank(role);
-      checkNotNull(quota);
-
-      return storage.consistentRead(new Quiet<QuotaComparisonResult>() {
-        @Override public QuotaComparisonResult apply(StoreProvider storeProvider) {
-          IQuota reserved = storeProvider.getQuotaStore().fetchQuota(role).or(Quotas.noQuota());
-          return Quotas.greaterOrEqual(reserved, Quotas.add(getConsumption(role), quota));
+    @Override
+    public QuotaInfo getQuotaInfo(final String role) {
+      return storage.consistentRead(new Work.Quiet<QuotaInfo>() {
+        @Override public QuotaInfo apply(StoreProvider storeProvider) {
+          FluentIterable<ITaskConfig> tasks = FluentIterable
+              .from(storeProvider.getTaskStore().fetchTasks(Query.roleScoped(role).active()))
+              .transform(Tasks.SCHEDULED_TO_INFO);
+
+          IQuota prodConsumed = fromTasks(tasks.filter(Tasks.IS_PRODUCTION));
+          IQuota nonProdConsumed =
+              fromTasks(tasks.filter(Predicates.not(Tasks.IS_PRODUCTION)));
+
+          IQuota quota = storeProvider.getQuotaStore().fetchQuota(role).or(Quotas.noQuota());
+
+          return new QuotaInfo(quota, prodConsumed, nonProdConsumed);
         }
       });
     }
+
+    @Override
+    public QuotaCheckResult checkQuota(ITaskConfig template, int instances) {
+      if (!template.isProduction()) {
+        return new QuotaCheckResult(SUFFICIENT_QUOTA);
+      }
+
+      QuotaInfo quotaInfo = getQuotaInfo(JobKeys.from(template).getRole());
+
+      IQuota additionalRequested =
+          Quotas.scale(fromTasks(ImmutableSet.of(template)), instances);
+
+      return QuotaCheckResult.greaterOrEqual(
+          quotaInfo.guota(),
+          add(quotaInfo.prodConsumption(), additionalRequested));
+    }
+
+    private static IQuota fromTasks(Iterable<ITaskConfig> tasks) {
+      double cpu = 0;
+      int ramMb = 0;
+      int diskMb = 0;
+      for (ITaskConfig task : tasks) {
+        cpu += task.getNumCpus();
+        ramMb += task.getRamMb();
+        diskMb += task.getDiskMb();
+      }
+
+      return IQuota.build(new Quota()
+          .setNumCpus(cpu)
+          .setRamMb(ramMb)
+          .setDiskMb(diskMb));
+    }
+
+    private static IQuota add(IQuota a, IQuota b) {
+      return IQuota.build(new Quota()
+          .setNumCpus(a.getNumCpus() + b.getNumCpus())
+          .setRamMb(a.getRamMb() + b.getRamMb())
+          .setDiskMb(a.getDiskMb() + b.getDiskMb()));
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/main/java/org/apache/aurora/scheduler/quota/QuotaModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaModule.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaModule.java
index 4a61949..fea2656 100644
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaModule.java
@@ -20,7 +20,6 @@ import javax.inject.Singleton;
 import com.google.inject.AbstractModule;
 
 import org.apache.aurora.scheduler.quota.QuotaManager.QuotaManagerImpl;
-import org.apache.aurora.scheduler.state.JobFilter;
 import org.apache.aurora.scheduler.storage.Storage;
 
 /**
@@ -34,8 +33,5 @@ public class QuotaModule extends AbstractModule {
 
     bind(QuotaManager.class).to(QuotaManagerImpl.class);
     bind(QuotaManagerImpl.class).in(Singleton.class);
-
-    bind(JobFilter.class).to(QuotaFilter.class);
-    bind(QuotaFilter.class).in(Singleton.class);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/main/java/org/apache/aurora/scheduler/quota/Quotas.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/Quotas.java b/src/main/java/org/apache/aurora/scheduler/quota/Quotas.java
index 24f2093..9da3c3e 100644
--- a/src/main/java/org/apache/aurora/scheduler/quota/Quotas.java
+++ b/src/main/java/org/apache/aurora/scheduler/quota/Quotas.java
@@ -15,16 +15,10 @@
  */
 package org.apache.aurora.scheduler.quota;
 
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
 
 import org.apache.aurora.gen.Quota;
-import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.storage.entities.IQuota;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-
-import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
  * Convenience class for normalizing resource measures between tasks and offers.
@@ -46,79 +40,6 @@ public final class Quotas {
   }
 
   /**
-   * Determines the amount of quota required for a set of job tasks.
-   *
-   * @param taskConfig Task template to count quota from.
-   * @return Quota requirement to run {@code job}.
-   */
-  public static IQuota fromTasks(ITaskConfig taskConfig, int instanceCount) {
-    return scale(fromProductionTasks(ImmutableSet.of(taskConfig)), instanceCount);
-  }
-
-  // TODO(Suman Karumuri): Refactor this function in to a new class.
-  // TODO(Suman Karumuri): Rename Quota to something more meaningful (ex: ResourceAggregate)
-  /**
-   * Determines the amount of quota required for production tasks among {@code tasks}.
-   *
-   * @param tasks Tasks to count quota from.
-   * @return Quota requirement to run {@code tasks}.
-   */
-  public static IQuota fromProductionTasks(Iterable<ITaskConfig> tasks) {
-    checkNotNull(tasks);
-
-    return fromTasks(Iterables.filter(tasks, Tasks.IS_PRODUCTION));
-  }
-
-  /**
-   * Determines the amount of quota required for the given tasks.
-   *
-   * @param tasks Tasks to count quota from.
-   * @return Quota requirement to run {@code tasks}.
-   */
-  public static IQuota fromTasks(Iterable<ITaskConfig> tasks) {
-    double cpu = 0;
-    int ramMb = 0;
-    int diskMb = 0;
-    for (ITaskConfig task : tasks) {
-      cpu += task.getNumCpus();
-      ramMb += task.getRamMb();
-      diskMb += task.getDiskMb();
-    }
-
-    return IQuota.build(new Quota()
-        .setNumCpus(cpu)
-        .setRamMb(ramMb)
-        .setDiskMb(diskMb));
-  }
-
-  /**
-   * a >= b
-   */
-  public static QuotaComparisonResult greaterOrEqual(IQuota a, IQuota b) {
-    return QuotaComparisonResult.greaterOrEqual(a, b);
-  }
-
-  /**
-   * a + b
-   */
-  public static IQuota add(IQuota a, IQuota b) {
-    return IQuota.build(new Quota()
-        .setNumCpus(a.getNumCpus() + b.getNumCpus())
-        .setRamMb(a.getRamMb() + b.getRamMb())
-        .setDiskMb(a.getDiskMb() + b.getDiskMb()));
-  }
-
-  /**
-   * a - b
-   */
-  public static IQuota subtract(IQuota a, IQuota b) {
-    return IQuota.build(new Quota()
-        .setNumCpus(a.getNumCpus() - b.getNumCpus())
-        .setRamMb(a.getRamMb() - b.getRamMb())
-        .setDiskMb(a.getDiskMb() - b.getDiskMb()));
-  }
-
-  /**
    * a * m
    */
   public static IQuota scale(IQuota a, int m) {
@@ -141,15 +62,4 @@ public final class Quotas {
         (double) a.getDiskMb() / b.getDiskMb()
     ).intValue();
   }
-
-  /**
-   * sum(qs)
-   */
-  public static IQuota sum(Iterable<IQuota> qs) {
-    IQuota sum = noQuota();
-    for (IQuota q : qs) {
-      sum = Quotas.add(sum, q);
-    }
-    return sum;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/main/java/org/apache/aurora/scheduler/state/JobFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/JobFilter.java b/src/main/java/org/apache/aurora/scheduler/state/JobFilter.java
deleted file mode 100644
index 0d84c1e..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/JobFilter.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-
-/**
- * An action that either accepts a configuration or rejects it with a reason.
- */
-public interface JobFilter {
-  /**
-   * Accept the JobConfiguration or reject it with a reason.
-   *
-   * @param jobConfiguration The job configuration to filter.
-   * @return A result and the reason the result was reached.
-   */
-  JobFilterResult filter(IJobConfiguration jobConfiguration);
-
-  /**
-   * Accept the TaskConfig with the specified number of instances
-   * or reject it with a reason.
-   *
-   * @param template The task configuration to filter.
-   * @param instanceCount Number of instances to apply taskConfig to.
-   * @return A result and the reason the result was reached.
-   */
-  JobFilterResult filter(ITaskConfig template, int instanceCount);
-
-  /**
-   * An indication of whether a job passed a filter or not.
-   */
-  public static final class JobFilterResult {
-    private final boolean pass;
-    private final String reason;
-
-    private JobFilterResult(boolean pass, String reason) {
-      this.pass = pass;
-      this.reason = Preconditions.checkNotNull(reason);
-    }
-
-    /**
-     * Create a new result indicating the job has passed the filter.
-     *
-     * @return A result indicating the job has passed with a default reason.
-     * @see #fail(String)
-     */
-    public static JobFilterResult pass() {
-      return new JobFilterResult(true, "Accepted by filter.");
-    }
-
-    /**
-     * Create a new result indicating the job has passed the filter.
-     *
-     * @param reason A reason that the job has passed.
-     * @return A result indicating the job has passed because of the given reason.
-     * @throws NullPointerException if reason is {@code null}.
-     * @see #fail(String)
-     */
-    public static JobFilterResult pass(String reason) {
-      return new JobFilterResult(true, reason);
-    }
-
-    /**
-     * Create a new result indicating the job has failed the filter.
-     *
-     * @param reason A reason that the job has failed.
-     * @return A result indicating the job has failed because of the given reason.
-     * @throws NullPointerException if {@code reason} is {@code null}.
-     * @see #pass()
-     * @see #pass(String)
-     */
-    public static JobFilterResult fail(String reason) {
-      return new JobFilterResult(false, reason);
-    }
-
-    /**
-     * Indicates whether the result indicates a pass.
-     *
-     * @return {@code true} if the result indicates a pass.
-     */
-    public boolean isPass() {
-      return pass;
-    }
-
-    /**
-     * Indicates the reason for the result.
-     *
-     * @return The reason for the result.
-     */
-    public String getReason() {
-      return reason;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (!(o instanceof JobFilterResult)) {
-        return false;
-      }
-      JobFilterResult that = (JobFilterResult) o;
-      return Objects.equal(pass, that.pass)
-          && Objects.equal(reason, that.reason);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(pass, reason);
-    }
-
-    @Override
-    public String toString() {
-      return Objects.toStringHelper(this)
-          .add("pass", pass)
-          .add("reason", reason)
-          .toString();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
index 8dec283..b3f19cc 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
@@ -43,6 +43,8 @@ import org.apache.aurora.scheduler.base.ScheduleException;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
 import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
+import org.apache.aurora.scheduler.quota.QuotaCheckResult;
+import org.apache.aurora.scheduler.quota.QuotaManager;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
@@ -57,6 +59,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.aurora.gen.ScheduleStatus.KILLING;
 import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
 import static org.apache.aurora.scheduler.base.Tasks.ACTIVE_STATES;
+import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.INSUFFICIENT_QUOTA;
 
 /**
  * Implementation of the scheduler core.
@@ -79,7 +82,7 @@ class SchedulerCoreImpl implements SchedulerCore {
   private final StateManager stateManager;
 
   private final TaskIdGenerator taskIdGenerator;
-  private final JobFilter jobFilter;
+  private final QuotaManager quotaManager;
 
   /**
    * Creates a new core scheduler.
@@ -89,7 +92,7 @@ class SchedulerCoreImpl implements SchedulerCore {
    * @param immediateScheduler Immediate scheduler.
    * @param stateManager Persistent state manager.
    * @param taskIdGenerator Task ID generator.
-   * @param jobFilter Job filter.
+   * @param quotaManager Quota manager.
    */
   @Inject
   public SchedulerCoreImpl(
@@ -98,7 +101,7 @@ class SchedulerCoreImpl implements SchedulerCore {
       ImmediateJobManager immediateScheduler,
       StateManager stateManager,
       TaskIdGenerator taskIdGenerator,
-      JobFilter jobFilter) {
+      QuotaManager quotaManager) {
 
     this.storage = checkNotNull(storage);
 
@@ -108,7 +111,7 @@ class SchedulerCoreImpl implements SchedulerCore {
     this.cronScheduler = cronScheduler;
     this.stateManager = checkNotNull(stateManager);
     this.taskIdGenerator = checkNotNull(taskIdGenerator);
-    this.jobFilter = checkNotNull(jobFilter);
+    this.quotaManager = checkNotNull(quotaManager);
   }
 
   private boolean hasActiveJob(IJobConfiguration job) {
@@ -123,30 +126,38 @@ class SchedulerCoreImpl implements SchedulerCore {
   }
 
   @Override
-  public synchronized void createJob(SanitizedConfiguration sanitizedConfiguration)
+  public synchronized void createJob(final SanitizedConfiguration sanitizedConfiguration)
       throws ScheduleException {
 
-    IJobConfiguration job = sanitizedConfiguration.getJobConfig();
-    if (hasActiveJob(job)) {
-      throw new ScheduleException("Job already exists: " + JobKeys.toPath(job));
-    }
+    storage.write(new MutateWork.NoResult<ScheduleException>() {
+      @Override protected void execute(MutableStoreProvider storeProvider)
+          throws ScheduleException {
+
+        final IJobConfiguration job = sanitizedConfiguration.getJobConfig();
+        if (hasActiveJob(job)) {
+          throw new ScheduleException("Job already exists: " + JobKeys.toPath(job));
+        }
 
-    runJobFilters(job.getKey(), job.getTaskConfig(), job.getInstanceCount(), false);
+        validateTaskLimits(job.getTaskConfig(), job.getInstanceCount());
 
-    boolean accepted = false;
-    for (final JobManager manager : jobManagers) {
-      if (manager.receiveJob(sanitizedConfiguration)) {
-        LOG.info("Job accepted by manager: " + manager.getUniqueKey());
-        accepted = true;
-        break;
-      }
-    }
+        boolean accepted = false;
+        // TODO(wfarner): Remove the JobManager abstraction, and directly invoke addInstances
+        // here for non-cron jobs.
+        for (final JobManager manager : jobManagers) {
+          if (manager.receiveJob(sanitizedConfiguration)) {
+            LOG.info("Job accepted by manager: " + manager.getUniqueKey());
+            accepted = true;
+            break;
+          }
+        }
 
-    if (!accepted) {
-      LOG.severe("Job was not accepted by any of the configured schedulers, discarding.");
-      LOG.severe("Discarded job: " + job);
-      throw new ScheduleException("Job not accepted, discarding.");
-    }
+        if (!accepted) {
+          LOG.severe("Job was not accepted by any of the configured schedulers, discarding.");
+          LOG.severe("Discarded job: " + job);
+          throw new ScheduleException("Job not accepted, discarding.");
+        }
+      }
+    });
   }
 
   // This number is derived from the maximum file name length limit on most UNIX systems, less
@@ -155,31 +166,32 @@ class SchedulerCoreImpl implements SchedulerCore {
   @VisibleForTesting
   static final int MAX_TASK_ID_LENGTH = 255 - 90;
 
-  // TODO(maximk): Consider a better approach to quota checking. MESOS-4476.
-  private void runJobFilters(IJobKey jobKey, ITaskConfig task, int count, boolean incremental)
+  /**
+   * Validates task specific requirements including name, count and quota checks.
+   * Must be performed inside of a write storage transaction along with state mutation change
+   * to avoid any data race conditions.
+   *
+   * @param task Task configuration.
+   * @param instances Number of task instances
+   * @throws ScheduleException If validation fails.
+   */
+  private void validateTaskLimits(ITaskConfig task, int instances)
       throws ScheduleException {
 
-    int instanceCount = count;
-    if (incremental) {
-      instanceCount +=
-          Storage.Util.weaklyConsistentFetchTasks(storage, Query.jobScoped(jobKey).active()).size();
-    }
-
     // TODO(maximk): This is a short-term hack to stop the bleeding from
     //               https://issues.apache.org/jira/browse/MESOS-691
-    if (taskIdGenerator.generate(task, instanceCount).length() > MAX_TASK_ID_LENGTH) {
+    if (taskIdGenerator.generate(task, instances).length() > MAX_TASK_ID_LENGTH) {
       throw new ScheduleException(
           "Task ID is too long, please shorten your role or job name.");
     }
 
-    JobFilter.JobFilterResult filterResult = jobFilter.filter(task, instanceCount);
-    // TODO(maximk): Consider deprecating JobFilterResult in favor of custom exception.
-    if (!filterResult.isPass()) {
-      throw new ScheduleException(filterResult.getReason());
+    if (instances > MAX_TASKS_PER_JOB.get()) {
+      throw new ScheduleException("Job exceeds task limit of " + MAX_TASKS_PER_JOB.get());
     }
 
-    if (instanceCount > MAX_TASKS_PER_JOB.get()) {
-      throw new ScheduleException("Job exceeds task limit of " + MAX_TASKS_PER_JOB.get());
+    QuotaCheckResult quotaCheck = quotaManager.checkQuota(task, instances);
+    if (quotaCheck.getResult() == INSUFFICIENT_QUOTA) {
+      throw new ScheduleException("Insufficient resource quota: " + quotaCheck.getDetails());
     }
   }
 
@@ -188,7 +200,7 @@ class SchedulerCoreImpl implements SchedulerCore {
       throws ScheduleException {
 
     IJobConfiguration job = sanitizedConfiguration.getJobConfig();
-    runJobFilters(job.getKey(), job.getTaskConfig(), job.getInstanceCount(), false);
+    validateTaskLimits(job.getTaskConfig(), job.getInstanceCount());
   }
 
   @Override
@@ -197,12 +209,12 @@ class SchedulerCoreImpl implements SchedulerCore {
       final ImmutableSet<Integer> instanceIds,
       final ITaskConfig config) throws ScheduleException {
 
-    runJobFilters(jobKey, config, instanceIds.size(), true);
     storage.write(new MutateWork.NoResult<ScheduleException>() {
-      @Override
-      protected void execute(MutableStoreProvider storeProvider)
+      @Override protected void execute(MutableStoreProvider storeProvider)
           throws ScheduleException {
 
+        validateTaskLimits(config, instanceIds.size());
+
         ImmutableSet<IScheduledTask> tasks =
             storeProvider.getTaskStore().fetchTasks(Query.jobScoped(jobKey).active());
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
index 79f5605..53d0c85 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
@@ -20,13 +20,11 @@ import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
 import com.google.inject.BindingAnnotation;
 
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.SchedulerException;
-import org.apache.aurora.scheduler.storage.entities.IQuota;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 
 /**
@@ -302,21 +300,5 @@ public interface Storage {
         }
       });
     }
-
-    /**
-     * Fetch quota for {@code role} from {@code storage} in a consistent read operation.
-     *
-     * @param storage Storage instance to fetch quota from.
-     * @param role Role to fetch quota for.
-     * @return Quota returned from the fetch operation.
-     * @see QuotaStore#fetchQuota(String)
-     */
-    public static Optional<IQuota> consistentFetchQuota(Storage storage, final String role) {
-      return storage.consistentRead(new Work.Quiet<Optional<IQuota>>() {
-        @Override public Optional<IQuota> apply(StoreProvider storeProvider) {
-          return storeProvider.getQuotaStore().fetchQuota(role);
-        }
-      });
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/main/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java b/src/main/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
index 8fb51d6..6f71fe2 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
@@ -15,6 +15,7 @@
  */
 package org.apache.aurora.scheduler.storage.testing;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
@@ -30,6 +31,7 @@ import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.Work;
 import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.entities.IQuota;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.easymock.Capture;
 import org.easymock.IAnswer;
@@ -131,4 +133,8 @@ public class StorageTestUtil {
   public IExpectationSetters<?> expectTaskFetch(Query.Builder query, IScheduledTask... result) {
     return expectTaskFetch(query, ImmutableSet.<IScheduledTask>builder().add(result).build());
   }
+
+  public IExpectationSetters<?> expectQuotaFetch(String role, Optional<IQuota> result) {
+    return expect(quotaStore.fetchQuota(role)).andReturn(result);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index 76caa62..cf9099f 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -96,7 +96,9 @@ import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
 import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
-import org.apache.aurora.scheduler.quota.Quotas;
+import org.apache.aurora.scheduler.quota.QuotaInfo;
+import org.apache.aurora.scheduler.quota.QuotaManager;
+import org.apache.aurora.scheduler.quota.QuotaManager.QuotaException;
 import org.apache.aurora.scheduler.state.CronJobManager;
 import org.apache.aurora.scheduler.state.LockManager;
 import org.apache.aurora.scheduler.state.LockManager.LockException;
@@ -106,8 +108,6 @@ import org.apache.aurora.scheduler.storage.JobStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.Work;
 import org.apache.aurora.scheduler.storage.backup.Recovery;
 import org.apache.aurora.scheduler.storage.backup.Recovery.RecoveryException;
 import org.apache.aurora.scheduler.storage.backup.StorageBackup;
@@ -170,6 +170,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
   private final Recovery recovery;
   private final MaintenanceController maintenance;
   private final CronJobManager cronJobManager;
+  private final QuotaManager quotaManager;
   private final Amount<Long, Time> killTaskInitialBackoff;
   private final Amount<Long, Time> killTaskMaxBackoff;
 
@@ -182,7 +183,8 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       StorageBackup backup,
       Recovery recovery,
       CronJobManager cronJobManager,
-      MaintenanceController maintenance) {
+      MaintenanceController maintenance,
+      QuotaManager quotaManager) {
 
     this(storage,
         schedulerCore,
@@ -192,6 +194,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
         recovery,
         maintenance,
         cronJobManager,
+        quotaManager,
         KILL_TASK_INITIAL_BACKOFF.get(),
         KILL_TASK_MAX_BACKOFF.get());
   }
@@ -206,6 +209,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       Recovery recovery,
       MaintenanceController maintenance,
       CronJobManager cronJobManager,
+      QuotaManager quotaManager,
       Amount<Long, Time> initialBackoff,
       Amount<Long, Time> maxBackoff) {
 
@@ -217,6 +221,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     this.recovery = checkNotNull(recovery);
     this.maintenance = checkNotNull(maintenance);
     this.cronJobManager = checkNotNull(cronJobManager);
+    this.quotaManager = checkNotNull(quotaManager);
     this.killTaskInitialBackoff = checkNotNull(initialBackoff);
     this.killTaskMaxBackoff = checkNotNull(maxBackoff);
   }
@@ -308,7 +313,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       SanitizedConfiguration sanitized =
           SanitizedConfiguration.fromUnsanitized(IJobConfiguration.build(description));
 
-      // TODO(maximk): Consider moving job validation logic into a dedicated RPC. MESOS-4476.
+      // TODO(maximk): Drop it once migration to client quota checks is completed.
       if (validation != null && validation == JobConfigValidation.RUN_FILTERS) {
         schedulerCore.validateJobResources(sanitized);
       }
@@ -590,16 +595,28 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
   public Response getQuota(final String ownerRole) {
     checkNotBlank(ownerRole);
 
-    IQuota quota = storage.consistentRead(new Work.Quiet<IQuota>() {
-      @Override public IQuota apply(StoreProvider storeProvider) {
-        return storeProvider.getQuotaStore().fetchQuota(ownerRole).or(Quotas.noQuota());
-      }
-    });
+    QuotaInfo quotaInfo = quotaManager.getQuotaInfo(ownerRole);
+    GetQuotaResult result = new GetQuotaResult(quotaInfo.guota().newBuilder())
+        .setConsumed(quotaInfo.prodConsumption().newBuilder());
 
-    return new Response()
-        .setResponseCode(OK)
-        .setResult(Result.getQuotaResult(new GetQuotaResult()
-            .setQuota(quota.newBuilder())));
+    return new Response().setResponseCode(OK).setResult(Result.getQuotaResult(result));
+  }
+
+
+  @Requires(whitelist = Capability.PROVISIONER)
+  @Override
+  public Response setQuota(final String ownerRole, final Quota quota, SessionKey session) {
+    checkNotBlank(ownerRole);
+    checkNotNull(quota);
+    checkNotNull(session);
+
+    Response response = new Response();
+    try {
+      quotaManager.saveQuota(ownerRole, IQuota.build(quota));
+      return response.setResponseCode(OK).setMessage("Quota applied.");
+    } catch (QuotaException e) {
+      return response.setResponseCode(ResponseCode.INVALID_REQUEST).setMessage(e.getMessage());
+    }
   }
 
   @Override
@@ -634,24 +651,6 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
               .setStatuses(maintenance.endMaintenance(hosts.getHostNames()))));
   }
 
-  @Requires(whitelist = Capability.PROVISIONER)
-  @Override
-  public Response setQuota(final String ownerRole, final Quota quota, SessionKey session) {
-    checkNotBlank(ownerRole);
-    checkNotNull(quota);
-    checkNotNull(session);
-
-    // TODO(Kevin Sweeney): Input validation for Quota.
-
-    storage.write(new MutateWork.NoResult.Quiet() {
-      @Override protected void execute(MutableStoreProvider storeProvider) {
-        storeProvider.getQuotaStore().saveQuota(ownerRole, IQuota.build(quota));
-      }
-    });
-
-    return new Response().setResponseCode(OK).setMessage("Quota applied.");
-  }
-
   @Override
   public Response forceTaskState(
       String taskId,

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/test/java/org/apache/aurora/scheduler/quota/QuotaCheckResultTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/quota/QuotaCheckResultTest.java b/src/test/java/org/apache/aurora/scheduler/quota/QuotaCheckResultTest.java
new file mode 100644
index 0000000..e366026
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/quota/QuotaCheckResultTest.java
@@ -0,0 +1,88 @@
+/**
+ * Copyright 2013 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.quota;
+
+import org.apache.aurora.gen.Quota;
+import org.apache.aurora.scheduler.storage.entities.IQuota;
+import org.junit.Test;
+
+import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.INSUFFICIENT_QUOTA;
+import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.SUFFICIENT_QUOTA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class QuotaCheckResultTest {
+
+  @Test
+  public void testGreaterOrEqualPass() {
+    IQuota quota = IQuota.build(new Quota()
+            .setNumCpus(1.0)
+            .setRamMb(256L)
+            .setDiskMb(512L));
+    IQuota request = IQuota.build(new Quota()
+            .setNumCpus(1.0)
+            .setRamMb(256L)
+            .setDiskMb(512L));
+    assertEquals(SUFFICIENT_QUOTA, QuotaCheckResult.greaterOrEqual(quota, request).getResult());
+  }
+
+  @Test
+  public void testGreaterOrEqualFailsCpu() {
+    IQuota quota = IQuota.build(new Quota()
+            .setNumCpus(1.0)
+            .setRamMb(256L)
+            .setDiskMb(512L));
+    IQuota request = IQuota.build(new Quota()
+            .setNumCpus(2.0)
+            .setRamMb(256L)
+            .setDiskMb(512L));
+    QuotaCheckResult result = QuotaCheckResult.greaterOrEqual(quota, request);
+    assertEquals(INSUFFICIENT_QUOTA, result.getResult());
+    assertTrue(result.getDetails().get().contains("CPU"));
+  }
+
+  @Test
+  public void testGreaterOrEqualFailsRam() {
+    IQuota quota = IQuota.build(new Quota()
+            .setNumCpus(1.0)
+            .setRamMb(256L)
+            .setDiskMb(512L));
+    IQuota request = IQuota.build(new Quota()
+            .setNumCpus(1.0)
+            .setRamMb(512L)
+            .setDiskMb(512L));
+    QuotaCheckResult result = QuotaCheckResult.greaterOrEqual(quota, request);
+    assertEquals(INSUFFICIENT_QUOTA, result.getResult());
+    assertTrue(result.getDetails().get().length() > 0);
+    assertTrue(result.getDetails().get().contains("RAM"));
+  }
+
+  @Test
+  public void testGreaterOrEqualFailsDisk() {
+    IQuota quota = IQuota.build(new Quota()
+            .setNumCpus(1.0)
+            .setRamMb(256L)
+            .setDiskMb(512L));
+    IQuota request = IQuota.build(new Quota()
+            .setNumCpus(1.0)
+            .setRamMb(256L)
+            .setDiskMb(1024L));
+    QuotaCheckResult result = QuotaCheckResult.greaterOrEqual(quota, request);
+    assertEquals(INSUFFICIENT_QUOTA, result.getResult());
+    assertTrue(result.getDetails().get().length() > 0);
+    assertTrue(result.getDetails().get().contains("DISK"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/test/java/org/apache/aurora/scheduler/quota/QuotaComparisonResultTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/quota/QuotaComparisonResultTest.java b/src/test/java/org/apache/aurora/scheduler/quota/QuotaComparisonResultTest.java
deleted file mode 100644
index 23069b8..0000000
--- a/src/test/java/org/apache/aurora/scheduler/quota/QuotaComparisonResultTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.quota;
-
-import org.apache.aurora.gen.Quota;
-import org.apache.aurora.scheduler.storage.entities.IQuota;
-import org.junit.Test;
-
-import static org.apache.aurora.scheduler.quota.QuotaComparisonResult.Result.INSUFFICIENT_QUOTA;
-import static org.apache.aurora.scheduler.quota.QuotaComparisonResult.Result.SUFFICIENT_QUOTA;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class QuotaComparisonResultTest {
-
-  @Test
-  public void testGreaterOrEqualPass() {
-    IQuota quota = IQuota.build(new Quota()
-            .setNumCpus(1.0)
-            .setRamMb(256L)
-            .setDiskMb(512L));
-    IQuota request = IQuota.build(new Quota()
-            .setNumCpus(1.0)
-            .setRamMb(256L)
-            .setDiskMb(512L));
-    assertEquals(SUFFICIENT_QUOTA, QuotaComparisonResult.greaterOrEqual(quota, request).result());
-  }
-
-  @Test
-  public void testGreaterOrEqualFailsCpu() {
-    IQuota quota = IQuota.build(new Quota()
-            .setNumCpus(1.0)
-            .setRamMb(256L)
-            .setDiskMb(512L));
-    IQuota request = IQuota.build(new Quota()
-            .setNumCpus(2.0)
-            .setRamMb(256L)
-            .setDiskMb(512L));
-    QuotaComparisonResult result = QuotaComparisonResult.greaterOrEqual(quota, request);
-    assertEquals(INSUFFICIENT_QUOTA, result.result());
-    assertTrue(result.details().contains("CPU"));
-  }
-
-  @Test
-  public void testGreaterOrEqualFailsRam() {
-    IQuota quota = IQuota.build(new Quota()
-            .setNumCpus(1.0)
-            .setRamMb(256L)
-            .setDiskMb(512L));
-    IQuota request = IQuota.build(new Quota()
-            .setNumCpus(1.0)
-            .setRamMb(512L)
-            .setDiskMb(512L));
-    QuotaComparisonResult result = QuotaComparisonResult.greaterOrEqual(quota, request);
-    assertEquals(INSUFFICIENT_QUOTA, result.result());
-    assertTrue(result.details().length() > 0);
-    assertTrue(result.details().contains("RAM"));
-  }
-
-  @Test
-  public void testGreaterOrEqualFailsDisk() {
-    IQuota quota = IQuota.build(new Quota()
-            .setNumCpus(1.0)
-            .setRamMb(256L)
-            .setDiskMb(512L));
-    IQuota request = IQuota.build(new Quota()
-            .setNumCpus(1.0)
-            .setRamMb(256L)
-            .setDiskMb(1024L));
-    QuotaComparisonResult result = QuotaComparisonResult.greaterOrEqual(quota, request);
-    assertEquals(INSUFFICIENT_QUOTA, result.result());
-    assertTrue(result.details().length() > 0);
-    assertTrue(result.details().contains("DISK"));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/test/java/org/apache/aurora/scheduler/quota/QuotaFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/quota/QuotaFilterTest.java b/src/test/java/org/apache/aurora/scheduler/quota/QuotaFilterTest.java
deleted file mode 100644
index b1d878e..0000000
--- a/src/test/java/org/apache/aurora/scheduler/quota/QuotaFilterTest.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.quota;
-
-import com.twitter.common.testing.easymock.EasyMockTest;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.Identity;
-import org.apache.aurora.gen.JobConfiguration;
-import org.apache.aurora.gen.Quota;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.quota.QuotaManager.QuotaManagerImpl;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IQuota;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.scheduler.quota.QuotaComparisonResult.Result.INSUFFICIENT_QUOTA;
-import static org.apache.aurora.scheduler.quota.QuotaComparisonResult.Result.SUFFICIENT_QUOTA;
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class QuotaFilterTest extends EasyMockTest {
-  private static final int DEFAULT_TASKS_IN_QUOTA = 10;
-  private static final String ROLE = "test";
-  private static final String JOB_NAME = "test_job";
-  private static final String ENV = "test_env";
-  private static final IJobKey JOB_KEY = JobKeys.from(ROLE, ENV, JOB_NAME);
-  private static final Query.Builder QUERY = Query.jobScoped(JOB_KEY).active();
-  private static final IQuota QUOTA = IQuota.build(new Quota()
-      .setNumCpus(1.0)
-      .setRamMb(256L)
-      .setDiskMb(512L));
-  private static final ITaskConfig TASK_CONFIG = ITaskConfig.build(new TaskConfig()
-      .setNumCpus(QUOTA.getNumCpus())
-      .setRamMb(QUOTA.getRamMb())
-      .setDiskMb(QUOTA.getDiskMb()));
-  private static final IJobConfiguration JOB = IJobConfiguration.build(new JobConfiguration()
-      .setKey(JOB_KEY.newBuilder())
-      .setInstanceCount(1)
-      .setTaskConfig(TASK_CONFIG.newBuilder()));
-
-  private QuotaFilter quotaFilter;
-
-  private QuotaManagerImpl quotaManager;
-  private StorageTestUtil storageTestUtil;
-  private QuotaComparisonResult quotaCompResult;
-
-  @Before
-  public void setUp() {
-    quotaManager = createMock(QuotaManagerImpl.class);
-    quotaCompResult = createMock(QuotaComparisonResult.class);
-    storageTestUtil = new StorageTestUtil(this);
-
-    quotaFilter = new QuotaFilter(quotaManager, storageTestUtil.storage);
-  }
-
-  @Test
-  public void testNonProductionPasses() {
-    JobConfiguration jobBuilder = JOB.newBuilder();
-    jobBuilder.getTaskConfig().setProduction(false);
-    IJobConfiguration job = IJobConfiguration.build(jobBuilder);
-
-    control.replay();
-
-    assertTrue(quotaFilter.filter(job).isPass());
-  }
-
-  @Test
-  public void testCreateProductionJobChecksQuota() {
-    JobConfiguration jobBuilder = JOB.newBuilder();
-    jobBuilder.getTaskConfig().setProduction(true);
-    IJobConfiguration job = IJobConfiguration.build(jobBuilder);
-
-    storageTestUtil.expectOperations();
-    storageTestUtil.expectTaskFetch(QUERY).times(2);
-
-    expect(quotaManager.checkQuota(ROLE, QUOTA)).andReturn(quotaCompResult);
-    expect(quotaCompResult.result()).andReturn(SUFFICIENT_QUOTA);
-    expect(quotaManager.checkQuota(ROLE, QUOTA)).andReturn(quotaCompResult);
-    expect(quotaCompResult.result()).andReturn(INSUFFICIENT_QUOTA);
-    expect(quotaCompResult.details()).andReturn("Details");
-
-    control.replay();
-
-    assertTrue(quotaFilter.filter(job).isPass());
-    assertFalse(quotaFilter.filter(job).isPass());
-  }
-
-  @Test
-  public void testUpdateProductionJobChecksQuota() {
-    JobConfiguration jobBuilder = JOB.newBuilder();
-    jobBuilder.getTaskConfig().setProduction(true);
-
-    storageTestUtil.expectOperations();
-    storageTestUtil.expectTaskFetch(QUERY,
-        IScheduledTask.build(new ScheduledTask().setAssignedTask(
-            new AssignedTask().setTask(jobBuilder.getTaskConfig()))));
-
-    expect(quotaManager.checkQuota(ROLE, IQuota.build(new Quota(0, 0, 0))))
-        .andReturn(quotaCompResult);
-    expect(quotaCompResult.result()).andReturn(SUFFICIENT_QUOTA);
-
-    control.replay();
-
-    assertTrue(quotaFilter.filter(IJobConfiguration.build(jobBuilder)).isPass());
-  }
-
-  @Test
-  public void testIncreaseShardsExceedsQuota() {
-    int numTasks = DEFAULT_TASKS_IN_QUOTA;
-    int additionalTasks = 1;
-
-    JobConfiguration jobBuilder = JOB.newBuilder().setInstanceCount(numTasks + additionalTasks);
-    jobBuilder.getTaskConfig().setProduction(true);
-
-    IScheduledTask[] scheduledTasks = new IScheduledTask[numTasks];
-    for (int i = 0; i < numTasks; i++) {
-      ScheduledTask builder = new ScheduledTask().setAssignedTask(
-          new AssignedTask().setTask(jobBuilder.getTaskConfig()));
-      builder.getAssignedTask().setInstanceId(i);
-      scheduledTasks[i] = IScheduledTask.build(builder);
-    }
-
-    storageTestUtil.expectOperations();
-    storageTestUtil.expectTaskFetch(QUERY, scheduledTasks);
-
-    expect(quotaManager.checkQuota(ROLE, QUOTA)).andReturn(quotaCompResult);
-    expect(quotaCompResult.result()).andReturn(INSUFFICIENT_QUOTA);
-    expect(quotaCompResult.details()).andReturn("Details");
-
-    control.replay();
-
-    assertFalse(quotaFilter.filter(IJobConfiguration.build(jobBuilder)).isPass());
-  }
-
-  @Test
-  public void testUpdateProductionTasksChecksQuota() {
-    JobConfiguration jobBuilder = JOB.newBuilder();
-    TaskConfig config = jobBuilder.getTaskConfig()
-        .setProduction(true)
-        .setOwner(new Identity(ROLE, "user"))
-        .setEnvironment(ENV)
-        .setJobName(JOB_NAME);
-
-    storageTestUtil.expectOperations();
-    storageTestUtil.expectTaskFetch(QUERY,
-        IScheduledTask.build(new ScheduledTask().setAssignedTask(
-            new AssignedTask().setTask(jobBuilder.getTaskConfig()))));
-
-    expect(quotaManager.checkQuota(ROLE, IQuota.build(new Quota(0, 0, 0))))
-        .andReturn(quotaCompResult);
-    expect(quotaCompResult.result()).andReturn(SUFFICIENT_QUOTA);
-
-    control.replay();
-
-    assertTrue(quotaFilter.filter(ITaskConfig.build(config), 1).isPass());
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testUpdateProductionTasksFailsJobKeyCreation() {
-    JobConfiguration jobBuilder = JOB.newBuilder();
-    jobBuilder.getTaskConfig().setOwner(new Identity(ROLE, "user"));
-    control.replay();
-
-    quotaFilter.filter(ITaskConfig.build(jobBuilder.getTaskConfig()), 1);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
index f971aa1..82e17df 100644
--- a/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
@@ -25,26 +25,32 @@ import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.quota.QuotaManager.QuotaException;
 import org.apache.aurora.scheduler.quota.QuotaManager.QuotaManagerImpl;
 import org.apache.aurora.scheduler.storage.entities.IQuota;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.easymock.IExpectationSetters;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.apache.aurora.scheduler.quota.QuotaComparisonResult.Result.INSUFFICIENT_QUOTA;
-import static org.apache.aurora.scheduler.quota.QuotaComparisonResult.Result.SUFFICIENT_QUOTA;
+import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.INSUFFICIENT_QUOTA;
+import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.SUFFICIENT_QUOTA;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class QuotaManagerImplTest extends EasyMockTest {
-  private static final String ROLE = "foo";
+  private static final String ROLE = "test";
+  private static final String ENV = "test_env";
+  private static final IQuota QUOTA = IQuota.build(new Quota()
+      .setNumCpus(1.0)
+      .setRamMb(100L)
+      .setDiskMb(200L));
   private static final Query.Builder ACTIVE_QUERY = Query.roleScoped(ROLE).active();
 
   private StorageTestUtil storageUtil;
-  // TODO(maximk): Move checkQuota to QuotaFilter along with tests.
   private QuotaManagerImpl quotaManager;
 
   @Before
@@ -54,125 +60,172 @@ public class QuotaManagerImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testGetEmptyQuota() {
+  public void testGetQuotaInfo() {
+    IScheduledTask prodTask = createTask("foo", "id1", 3, 3, 3, true);
+    IScheduledTask nonProdTask = createTask("bar", "id1", 2, 2, 2, false);
+    IQuota quota = IQuota.build(new Quota(4, 4, 4));
+
+    expectQuota(quota);
+    expectTasks(prodTask, nonProdTask);
     storageUtil.expectOperations();
-    returnNoTasks();
 
     control.replay();
 
-    assertEquals(Quotas.noQuota(), quotaManager.getConsumption(ROLE));
+    QuotaInfo quotaInfo = quotaManager.getQuotaInfo(ROLE);
+    assertEquals(quota, quotaInfo.guota());
+    assertEquals(IQuota.build(new Quota(3, 3, 3)), quotaInfo.prodConsumption());
+    assertEquals(IQuota.build(new Quota(2, 2, 2)), quotaInfo.nonProdConsumption());
   }
 
   @Test
-  public void testConsumeNoQuota() {
+  public void testGetQuotaInfoNoTasks() {
+    IQuota quota = IQuota.build(new Quota(4, 4, 4));
+
+    expectQuota(quota);
+    expectNoTasks();
     storageUtil.expectOperations();
-    applyQuota(new Quota(1, 1, 1));
-    returnNoTasks();
 
     control.replay();
 
-    assertEquals(SUFFICIENT_QUOTA, quotaManager.checkQuota(ROLE, Quotas.noQuota()).result());
+    QuotaInfo quotaInfo = quotaManager.getQuotaInfo(ROLE);
+    assertEquals(quota, quotaInfo.guota());
+    assertEquals(Quotas.noQuota(), quotaInfo.prodConsumption());
+    assertEquals(Quotas.noQuota(), quotaInfo.nonProdConsumption());
   }
 
   @Test
-  public void testNoQuotaExhausted() {
+  public void testCheckQuotaPasses() {
+    expectQuota(IQuota.build(new Quota(4, 4, 4)));
+    expectTasks(createTask("foo", "id1", 3, 3, 3, true));
     storageUtil.expectOperations();
-    returnNoTasks();
-    expect(storageUtil.quotaStore.fetchQuota(ROLE)).andReturn(Optional.<IQuota>absent());
 
     control.replay();
 
-    QuotaComparisonResult result =
-        quotaManager.checkQuota(ROLE, IQuota.build(new Quota(1, 1, 1)));
-
-    assertEquals(INSUFFICIENT_QUOTA, result.result());
-    assertTrue(result.details().length() > 0);
+    QuotaCheckResult checkQuota = quotaManager.checkQuota(createTaskConfig(1, 1, 1, true), 1);
+    assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
   }
 
   @Test
-  public void testUseAllQuota() {
-    IScheduledTask task1 = createTask("foo", "id1", 1, 1, 1);
-    IScheduledTask task2 = createTask("foo", "id2", 1, 1, 1);
-
+  public void testCheckQuotaPassesNoTasks() {
+    expectQuota(IQuota.build(new Quota(4, 4, 4)));
+    expectNoTasks();
     storageUtil.expectOperations();
-    applyQuota(new Quota(2, 2, 2)).anyTimes();
-    returnTasks(task1);
-    returnTasks(task1, task2);
 
     control.replay();
 
-    IQuota half = IQuota.build(new Quota(1, 1, 1));
-    assertEquals(SUFFICIENT_QUOTA, quotaManager.checkQuota(ROLE, half).result());
-    assertEquals(INSUFFICIENT_QUOTA, quotaManager.checkQuota(ROLE, half).result());
+    QuotaCheckResult checkQuota = quotaManager.checkQuota(createTaskConfig(1, 1, 1, true), 1);
+    assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
   }
 
   @Test
-  public void testExhaustCpu() {
+  public void testCheckQuotaPassesNonProdUnaccounted() {
+    expectQuota(IQuota.build(new Quota(4, 4, 4)));
+    expectTasks(createTask("foo", "id1", 3, 3, 3, true), createTask("bar", "id2", 5, 5, 5, false));
     storageUtil.expectOperations();
-    applyQuota(new Quota(2, 2, 2));
-    returnTasks(createTask("foo", "id1", 1, 1, 1));
 
     control.replay();
 
-    assertEquals(
-        INSUFFICIENT_QUOTA,
-        quotaManager.checkQuota(ROLE, IQuota.build(new Quota(2, 1, 1))).result());
+    QuotaCheckResult checkQuota = quotaManager.checkQuota(createTaskConfig(1, 1, 1, true), 1);
+    assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
   }
 
   @Test
-  public void testExhaustRam() {
+  public void testCheckQuotaSkippedForNonProdRequest() {
+    control.replay();
+
+    QuotaCheckResult checkQuota = quotaManager.checkQuota(createTaskConfig(1, 1, 1, false), 1);
+    assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
+  }
+
+  @Test
+  public void testCheckQuotaNoQuotaSet() {
+    expect(storageUtil.quotaStore.fetchQuota(ROLE)).andReturn(Optional.<IQuota>absent());
+    expectNoTasks();
     storageUtil.expectOperations();
-    applyQuota(new Quota(2, 2, 2));
-    returnTasks(createTask("foo", "id1", 1, 1, 1));
 
     control.replay();
+    QuotaCheckResult checkQuota = quotaManager.checkQuota(createTaskConfig(1, 1, 1, true), 1);
+    assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
+  }
+
+  @Test
+  public void testCheckQuotaExceedsCpu() {
+    expectQuota(IQuota.build(new Quota(4, 4, 4)));
+    expectTasks(createTask("foo", "id1", 3, 3, 3, true));
+    storageUtil.expectOperations();
 
-    assertEquals(
-        INSUFFICIENT_QUOTA,
-        quotaManager.checkQuota(ROLE, IQuota.build(new Quota(1, 2, 1))).result());
+    control.replay();
+    QuotaCheckResult checkQuota = quotaManager.checkQuota(createTaskConfig(2, 1, 1, true), 1);
+    assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
+    assertTrue(checkQuota.getDetails().get().contains("CPU"));
   }
 
   @Test
-  public void testExhaustDisk() {
+  public void testCheckQuotaExceedsRam() {
+    expectQuota(IQuota.build(new Quota(4, 4, 4)));
+    expectTasks(createTask("foo", "id1", 3, 3, 3, true));
     storageUtil.expectOperations();
-    applyQuota(new Quota(2, 2, 2));
-    returnTasks(createTask("foo", "id1", 1, 1, 1));
 
     control.replay();
+    QuotaCheckResult checkQuota = quotaManager.checkQuota(createTaskConfig(1, 2, 1, true), 1);
+    assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
+    assertTrue(checkQuota.getDetails().get().contains("RAM"));
+  }
 
-    assertEquals(
-        INSUFFICIENT_QUOTA,
-        quotaManager.checkQuota(ROLE, IQuota.build(new Quota(1, 1, 2))).result());
+  @Test
+  public void testCheckQuotaExceedsDisk() {
+    expectQuota(IQuota.build(new Quota(4, 4, 4)));
+    expectTasks(createTask("foo", "id1", 3, 3, 3, true));
+    storageUtil.expectOperations();
+
+    control.replay();
+    QuotaCheckResult checkQuota = quotaManager.checkQuota(createTaskConfig(1, 1, 2, true), 1);
+    assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
+    assertTrue(checkQuota.getDetails().get().contains("DISK"));
   }
 
   @Test
-  public void testNonproductionUnaccounted() {
-    ScheduledTask builder = createTask("foo", "id1", 3, 3, 3).newBuilder();
-    builder.getAssignedTask().getTask().setProduction(false);
-    IScheduledTask task = IScheduledTask.build(builder);
+  public void testSaveQuotaPasses() throws Exception {
+    storageUtil.quotaStore.saveQuota(ROLE, QUOTA);
+    storageUtil.expectOperations();
+
+    control.replay();
+    quotaManager.saveQuota(ROLE, QUOTA);
+  }
 
+  @Test(expected = QuotaException.class)
+  public void testSaveQuotaFailsMissingSpecs() throws Exception {
     storageUtil.expectOperations();
-    applyQuota(new Quota(2, 2, 2));
-    returnTasks(task);
 
     control.replay();
+    quotaManager.saveQuota(ROLE, IQuota.build(new Quota()));
+  }
+
+  @Test(expected = QuotaException.class)
+  public void testSaveQuotaFailsNegativeValues() throws Exception {
+    storageUtil.expectOperations();
 
-    assertEquals(
-        SUFFICIENT_QUOTA,
-        quotaManager.checkQuota(ROLE, IQuota.build(new Quota(2, 2, 2))).result());
+    control.replay();
+    quotaManager.saveQuota(ROLE, IQuota.build(new Quota(-2.0, 4, 5)));
   }
 
-  private IExpectationSetters<?> returnTasks(IScheduledTask... tasks) {
+  private IExpectationSetters<?> expectTasks(IScheduledTask... tasks) {
     return storageUtil.expectTaskFetch(ACTIVE_QUERY, tasks);
   }
 
-  private IExpectationSetters<?> returnNoTasks() {
-    return returnTasks();
+  private IExpectationSetters<?> expectNoTasks() {
+    return expectTasks();
   }
 
-  private IExpectationSetters<Optional<IQuota>> applyQuota(Quota quota) {
+  private IExpectationSetters<Optional<IQuota>> expectQuota(IQuota quota) {
     return expect(storageUtil.quotaStore.fetchQuota(ROLE))
-        .andReturn(Optional.of(IQuota.build(quota)));
+        .andReturn(Optional.of(quota));
+  }
+
+  private ITaskConfig createTaskConfig(int cpus, int ramMb, int diskMb, boolean production) {
+    return createTask("newTask", "newId", cpus, ramMb, diskMb, production)
+        .getAssignedTask()
+        .getTask();
   }
 
   private IScheduledTask createTask(
@@ -180,23 +233,21 @@ public class QuotaManagerImplTest extends EasyMockTest {
       String taskId,
       int cpus,
       int ramMb,
-      int diskMb) {
+      int diskMb,
+      boolean production) {
 
     return IScheduledTask.build(new ScheduledTask()
         .setStatus(ScheduleStatus.RUNNING)
         .setAssignedTask(
             new AssignedTask()
                 .setTaskId(taskId)
-                .setTask(createTaskConfig(jobName, cpus, ramMb, diskMb))));
-  }
-
-  private TaskConfig createTaskConfig(String jobName, int cpus, int ramMb, int diskMb) {
-    return new TaskConfig()
-        .setOwner(new Identity(ROLE, ROLE))
-        .setJobName(jobName)
-        .setNumCpus(cpus)
-        .setRamMb(ramMb)
-        .setDiskMb(diskMb)
-        .setProduction(true);
+                .setTask(new TaskConfig()
+                    .setOwner(new Identity(ROLE, ROLE))
+                    .setEnvironment(ENV)
+                    .setJobName(jobName)
+                    .setNumCpus(cpus)
+                    .setRamMb(ramMb)
+                    .setDiskMb(diskMb)
+                    .setProduction(production))));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
index 4eeed38..9291fe6 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
@@ -70,7 +70,8 @@ import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
 import org.apache.aurora.scheduler.cron.CronScheduler;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.state.JobFilter.JobFilterResult;
+import org.apache.aurora.scheduler.quota.QuotaCheckResult;
+import org.apache.aurora.scheduler.quota.QuotaManager;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
@@ -100,6 +101,8 @@ import static org.apache.aurora.gen.ScheduleStatus.STARTING;
 import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE;
 import static org.apache.aurora.scheduler.configuration.ConfigurationManager.hostLimitConstraint;
 import static org.apache.aurora.scheduler.configuration.ConfigurationManager.validateAndPopulate;
+import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.INSUFFICIENT_QUOTA;
+import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.SUFFICIENT_QUOTA;
 import static org.easymock.EasyMock.anyInt;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.eq;
@@ -130,6 +133,9 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   private static final SlaveID SLAVE_ID = SlaveID.newBuilder().setValue("SlaveId").build();
   private static final String SLAVE_HOST_1 = "SlaveHost1";
 
+  private static final QuotaCheckResult ENOUGH_QUOTA = new QuotaCheckResult(SUFFICIENT_QUOTA);
+  private static final QuotaCheckResult NOT_ENOUGH_QUOTA = new QuotaCheckResult(INSUFFICIENT_QUOTA);
+
   private Driver driver;
   private StateManagerImpl stateManager;
   private Storage storage;
@@ -140,7 +146,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   private EventSink eventSink;
   private RescheduleCalculator rescheduleCalculator;
   private ShutdownRegistry shutdownRegistry;
-  private JobFilter jobFilter;
+  private QuotaManager quotaManager;
 
   // TODO(William Farner): Set up explicit expectations for calls to generate task IDs.
   private final AtomicLong idCounter = new AtomicLong();
@@ -155,19 +161,19 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     driver = createMock(Driver.class);
     clock = new FakeClock();
     eventSink = createMock(EventSink.class);
-    eventSink.post(EasyMock.<PubsubEvent>anyObject());
     rescheduleCalculator = createMock(RescheduleCalculator.class);
     cronScheduler = createMock(CronScheduler.class);
     shutdownRegistry = createMock(ShutdownRegistry.class);
-    jobFilter = createMock(JobFilter.class);
-    expectLastCall().anyTimes();
+    quotaManager = createMock(QuotaManager.class);
 
+    eventSink.post(EasyMock.<PubsubEvent>anyObject());
+    expectLastCall().anyTimes();
     expect(cronScheduler.schedule(anyObject(String.class), anyObject(Runnable.class)))
         .andStubReturn("key");
     expect(cronScheduler.isValidSchedule(anyObject(String.class))).andStubReturn(true);
 
-    expect(jobFilter.filter(anyObject(ITaskConfig.class), anyInt())).andStubReturn(
-        JobFilterResult.pass());
+    expect(quotaManager.checkQuota(anyObject(ITaskConfig.class), anyInt()))
+        .andStubReturn(ENOUGH_QUOTA);
   }
 
   /**
@@ -187,8 +193,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   private void buildScheduler(Storage newStorage) throws Exception {
     this.storage = newStorage;
     storage.write(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider storeProvider) {
+      @Override protected void execute(MutableStoreProvider storeProvider) {
         StorageBackfill.backfill(storeProvider, clock);
       }
     });
@@ -208,7 +213,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
         immediateManager,
         stateManager,
         taskIdGenerator,
-        jobFilter);
+        quotaManager);
     cron.schedulerCore = scheduler;
     immediateManager.schedulerCore = scheduler;
   }
@@ -1164,8 +1169,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   @Test
   public void testEnsureCanAddInstances() throws Exception {
     SanitizedConfiguration job = makeJob(KEY_A, 1);
-    expect(jobFilter.filter(job.getJobConfig().getTaskConfig(), 1))
-        .andReturn(JobFilterResult.pass());
 
     control.replay();
     buildScheduler();
@@ -1176,8 +1179,8 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   @Test(expected = ScheduleException.class)
   public void testEnsureCanAddInstancesFails() throws Exception {
     SanitizedConfiguration job = makeJob(KEY_A, 1);
-    expect(jobFilter.filter(job.getJobConfig().getTaskConfig(), 1))
-        .andReturn(JobFilterResult.fail("fail"));
+    expect(quotaManager.checkQuota(anyObject(ITaskConfig.class), anyInt()))
+        .andReturn(NOT_ENOUGH_QUOTA);
 
     control.replay();
     buildScheduler();
@@ -1216,8 +1219,8 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   @Test(expected = ScheduleException.class)
   public void testFilterFailRejectsCreate() throws Exception {
     SanitizedConfiguration job = makeJob(KEY_A, 1);
-    expect(jobFilter.filter(job.getJobConfig().getTaskConfig(), 1))
-        .andReturn(JobFilterResult.fail("failed"));
+    expect(quotaManager.checkQuota(anyObject(ITaskConfig.class), anyInt()))
+        .andReturn(NOT_ENOUGH_QUOTA);
 
     control.replay();
 
@@ -1228,7 +1231,8 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   @Test(expected = ScheduleException.class)
   public void testFilterFailRejectsAddInstances() throws Exception {
     IJobConfiguration job = makeJob(KEY_A, 1).getJobConfig();
-    expect(jobFilter.filter(job.getTaskConfig(), 1)).andReturn(JobFilterResult.fail("failed"));
+    expect(quotaManager.checkQuota(anyObject(ITaskConfig.class), anyInt()))
+        .andReturn(NOT_ENOUGH_QUOTA);
 
     control.replay();
 
@@ -1258,7 +1262,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
         .setEnvironment(ENV_A)
         .setJobName(KEY_A.getName())
         .setOwner(OWNER_A);
-    expect(jobFilter.filter(ITaskConfig.build(newTask), 2)).andReturn(JobFilterResult.pass());
     ImmutableSet<Integer> instances = ImmutableSet.of(1);
 
     control.replay();


Mime
View raw message