aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject git commit: Adding resource consumption calculation for cron jobs.
Date Mon, 03 Nov 2014 18:53:16 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 9186f2e52 -> 37cb06335


Adding resource consumption calculation for cron jobs.

Bugs closed: AURORA-825

Reviewed at https://reviews.apache.org/r/27317/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/37cb0633
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/37cb0633
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/37cb0633

Branch: refs/heads/master
Commit: 37cb06335a834de1894c882810a822f5f2ef69bc
Parents: 9186f2e
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Mon Nov 3 10:53:06 2014 -0800
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Mon Nov 3 10:53:06 2014 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/quota/QuotaInfo.java       |  29 +++
 .../aurora/scheduler/quota/QuotaManager.java    | 236 ++++++++++++-------
 .../scheduler/quota/QuotaManagerImplTest.java   | 187 +++++++++++----
 3 files changed, 326 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/37cb0633/src/main/java/org/apache/aurora/scheduler/quota/QuotaInfo.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaInfo.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaInfo.java
index d4e0f53..3e25812 100644
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaInfo.java
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaInfo.java
@@ -13,6 +13,8 @@
  */
 package org.apache.aurora.scheduler.quota;
 
+import java.util.Objects;
+
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 
 import static java.util.Objects.requireNonNull;
@@ -61,4 +63,31 @@ public class QuotaInfo {
   public IResourceAggregate getNonProdConsumption() {
     return nonProdConsumption;
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof QuotaInfo)) {
+      return false;
+    }
+
+    QuotaInfo other = (QuotaInfo) o;
+
+    return Objects.equals(quota, other.quota)
+        && Objects.equals(prodConsumption, other.prodConsumption)
+        && Objects.equals(nonProdConsumption, other.nonProdConsumption);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(quota, prodConsumption, nonProdConsumption);
+  }
+
+  @Override
+  public String toString() {
+    return com.google.common.base.Objects.toStringHelper(this)
+        .add("quota", quota)
+        .add("prodConsumption", prodConsumption)
+        .add("nonProdConsumption", nonProdConsumption)
+        .toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/37cb0633/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
index 3fbab8b..e27a74b 100644
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
@@ -27,6 +27,7 @@ import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableRangeSet;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
 import com.google.common.collect.Range;
 import com.google.common.collect.RangeSet;
 import com.google.common.collect.Sets;
@@ -34,14 +35,16 @@ import com.google.inject.Inject;
 
 import org.apache.aurora.gen.JobUpdateQuery;
 import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.ResourceAggregates;
-import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.cron.CronJobManager;
 import org.apache.aurora.scheduler.storage.JobUpdateStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.Work;
 import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
@@ -55,6 +58,9 @@ import org.apache.aurora.scheduler.updater.JobUpdateController;
 
 import static java.util.Objects.requireNonNull;
 
+import static org.apache.aurora.scheduler.base.Tasks.INFO_TO_JOB_KEY;
+import static org.apache.aurora.scheduler.base.Tasks.IS_PRODUCTION;
+import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_INFO;
 import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.SUFFICIENT_QUOTA;
 
 /**
@@ -112,10 +118,12 @@ public interface QuotaManager {
    */
   class QuotaManagerImpl implements QuotaManager {
     private final Storage storage;
+    private final CronJobManager cronJobManager;
 
     @Inject
-    QuotaManagerImpl(Storage storage) {
+    QuotaManagerImpl(Storage storage, CronJobManager cronJobManager) {
       this.storage = requireNonNull(storage);
+      this.cronJobManager = requireNonNull(cronJobManager);
     }
 
     @Override
@@ -151,11 +159,11 @@ public interface QuotaManager {
       }
 
       QuotaInfo quotaInfo = getQuotaInfo(template.getJob().getRole());
+      IResourceAggregate requestedTotal = add(
+          quotaInfo.getProdConsumption(),
+          ResourceAggregates.scale(fromTasks(ImmutableSet.of(template)), instances));
 
-      return QuotaCheckResult.greaterOrEqual(
-          quotaInfo.getQuota(),
-          add(quotaInfo.getProdConsumption(), ResourceAggregates.scale(
-              prodResourcesFromTasks(ImmutableSet.of(template)), instances)));
+      return QuotaCheckResult.greaterOrEqual(quotaInfo.getQuota(), requestedTotal);
     }
 
     @Override
@@ -177,11 +185,11 @@ public interface QuotaManager {
     /**
      * Gets QuotaInfo with currently allocated quota and actual consumption data.
      * <p>
-     * In case an optional {@code requestedUpdate} is specified, the production consumption
returned
-     * also includes an estimated resources share of that update as if it was already in
progress.
+     * In case an optional {@code requestedUpdate} is specified, the consumption returned
also
+     * includes an estimated resources share of that update as if it was already in progress.
      *
      * @param role Role to get quota info for.
-     * @param requestedUpdate An optional {@code IJobUpdate} to forecast the prod consumption.
+     * @param requestedUpdate An optional {@code IJobUpdate} to forecast the consumption.
      * @return {@code QuotaInfo} with quota and consumption details.
      */
     private QuotaInfo getQuotaInfo(final String role, final Optional<IJobUpdate> requestedUpdate)
{
@@ -191,12 +199,23 @@ public interface QuotaManager {
           FluentIterable<IScheduledTask> tasks = FluentIterable.from(
               storeProvider.getTaskStore().fetchTasks(Query.roleScoped(role).active()));
 
-          IResourceAggregate prodConsumed =
-              getProdConsumption(storeProvider.getJobUpdateStore(), role, tasks, requestedUpdate);
+          Map<IJobKey, IJobUpdate> updates = Maps.newHashMap(
+              fetchActiveJobUpdates(storeProvider.getJobUpdateStore(), role)
+                  .uniqueIndex(UPDATE_TO_JOB_KEY));
 
-          // TODO(maxim): Consider a similar update-aware approach for computing nonProdConsumed.
-          IResourceAggregate nonProdConsumed = fromTasks(
-              tasks.transform(Tasks.SCHEDULED_TO_INFO).filter(Predicates.not(Tasks.IS_PRODUCTION)));
+          // Mix in a requested job update (if present) to correctly calculate consumption.
+          // This would be an update that is not saved in the store yet (i.e. the one quota
is
+          // checked for).
+          if (requestedUpdate.isPresent()) {
+            updates.put(requestedUpdate.get().getSummary().getJobKey(), requestedUpdate.get());
+          }
+
+          Map<IJobKey, IJobConfiguration> cronTemplates =
+              FluentIterable.from(cronJobManager.getJobs()).uniqueIndex(JobKeys.FROM_CONFIG);
+
+          IResourceAggregate prodConsumed = getConsumption(tasks, updates, cronTemplates,
true);
+
+          IResourceAggregate nonProdConsumed = getConsumption(tasks, updates, cronTemplates,
false);
 
           IResourceAggregate quota =
               storeProvider.getQuotaStore().fetchQuota(role).or(ResourceAggregates.none());
@@ -206,47 +225,99 @@ public interface QuotaManager {
       });
     }
 
-    private IResourceAggregate getProdConsumption(
-        JobUpdateStore jobUpdateStore,
-        String role,
+    private IResourceAggregate getConsumption(
         FluentIterable<IScheduledTask> tasks,
-        Optional<IJobUpdate> requestedUpdate) {
-
-      // The algorithm here is as follows:
-      // 1. Load all production active tasks that belong to jobs without active updates OR
-      //    unaffected by an active update working set. An example of the latter would be
instances
-      //    not updated by the update due to being already in desired state or outside of
update
-      //    range (e.g. not in JobUpdateInstructions.updateOnlyTheseInstances).
-      //    Calculate consumed resources as "nonUpdateConsumption".
-      //
-      // 2. Mix in a requested job update (if present) to correctly calculate prod consumption.
-      //    This would be an update that is not saved in the store yet (i.e. the one quota
is
-      //    checked for).
+        Map<IJobKey, IJobUpdate> updatesByKey,
+        Map<IJobKey, IJobConfiguration> cronTemplatesByKey,
+        boolean isProd) {
+
+      Predicate<ITaskConfig> prodFilter = isProd ? IS_PRODUCTION : Predicates.not(IS_PRODUCTION);
+
+      FluentIterable<IScheduledTask> filteredTasks =
+          tasks.filter(Predicates.compose(prodFilter, SCHEDULED_TO_INFO));
+
+      IResourceAggregate nonCronConsumption = getNonCronConsumption(
+          updatesByKey,
+          excludeCronTasks(filteredTasks, cronTemplatesByKey),
+          isProd);
+
+      IResourceAggregate cronConsumption =
+          getCronConsumption(cronTemplatesByKey, filteredTasks, isProd);
+
+      return add(nonCronConsumption, cronConsumption);
+    }
+
+    private static IResourceAggregate getNonCronConsumption(
+        Map<IJobKey, IJobUpdate> updatesByKey,
+        FluentIterable<IScheduledTask> tasks,
+        boolean isProd) {
+
+      // 1. Get all active tasks that belong to jobs without active updates OR unaffected
by an
+      //    active update working set. An example of the latter would be instances not updated
by
+      //    the update due to being already in desired state or outside of update range (e.g.
+      //    not in JobUpdateInstructions.updateOnlyTheseInstances). Calculate consumed resources
+      //    as "nonUpdateConsumption".
       //
-      // 3. Calculate consumed resources from instances affected by the active job updates
as
+      // 2. Calculate consumed resources from instances affected by the active job updates
as
       //    "updateConsumption".
       //
-      // 4. Add up the two to yield total prod consumption.
+      // 3. Add up the two to yield total consumption.
 
-      Map<IJobKey, IJobUpdate> updatesByKey = Maps.newHashMap(
-          fetchActiveJobUpdates(jobUpdateStore, role).uniqueIndex(UPDATE_TO_JOB_KEY));
-
-      if (requestedUpdate.isPresent()) {
-        updatesByKey.put(requestedUpdate.get().getSummary().getJobKey(), requestedUpdate.get());
-      }
-
-      IResourceAggregate nonUpdateConsumption = prodResourcesFromTasks(tasks
+      IResourceAggregate nonUpdateConsumption = fromTasks(tasks
           .filter(buildNonUpdatingTasksFilter(updatesByKey))
-          .transform(Tasks.SCHEDULED_TO_INFO));
+          .transform(SCHEDULED_TO_INFO));
 
       IResourceAggregate updateConsumption = ResourceAggregates.EMPTY;
       for (IJobUpdate update : updatesByKey.values()) {
-        updateConsumption = add(updateConsumption, toProdResources(update.getInstructions()));
+        updateConsumption =
+            add(updateConsumption, instructionsToResources(update.getInstructions(), isProd));
       }
 
       return add(nonUpdateConsumption, updateConsumption);
     }
 
+    private static IResourceAggregate getCronConsumption(
+        Map<IJobKey, IJobConfiguration> cronTemplates,
+        FluentIterable<IScheduledTask> tasks,
+        boolean isProd) {
+
+      // Calculate the overall cron consumption as MAX between cron template resources and
active
+      // cron tasks. This is required to account for a case when a running cron task has
higher
+      // resource requirements than its updated template.
+      //
+      // While this is the "worst case" calculation that does not account for a possible
"staggered"
+      // cron scheduling, it's the simplest approach possible given the system constraints
(e.g.:
+      // lack of enforcement on a cron job run duration).
+
+      Multimap<IJobKey, ITaskConfig> taskConfigsByKey =
+          tasks.transform(SCHEDULED_TO_INFO).index(INFO_TO_JOB_KEY);
+
+      IResourceAggregate totalConsumption = ResourceAggregates.EMPTY;
+      for (IJobConfiguration config : cronTemplates.values()) {
+        if (isProd == config.getTaskConfig().isProduction()) {
+          IResourceAggregate templateConsumption = ResourceAggregates.scale(
+              fromTasks(ImmutableSet.of(config.getTaskConfig())), config.getInstanceCount());
+
+          IResourceAggregate taskConsumption = fromTasks(taskConfigsByKey.get(config.getKey()));
+
+          totalConsumption = add(totalConsumption, max(templateConsumption, taskConsumption));
+        }
+      }
+      return totalConsumption;
+    }
+
+    private static FluentIterable<IScheduledTask> excludeCronTasks(
+        FluentIterable<IScheduledTask> tasks,
+        final Map<IJobKey, IJobConfiguration> cronJobs) {
+
+      return tasks.filter(new Predicate<IScheduledTask>() {
+        @Override
+        public boolean apply(IScheduledTask input) {
+          return !cronJobs.containsKey(input.getAssignedTask().getTask().getJob());
+        }
+      });
+    }
+
     private static Predicate<IScheduledTask> buildNonUpdatingTasksFilter(
         final Map<IJobKey, IJobUpdate> roleJobUpdates) {
 
@@ -303,53 +374,68 @@ public interface QuotaManager {
       return builder.build();
     }
 
-    private static IResourceAggregate add(IResourceAggregate a, IResourceAggregate b) {
-      return IResourceAggregate.build(new ResourceAggregate()
-          .setNumCpus(a.getNumCpus() + b.getNumCpus())
-          .setRamMb(a.getRamMb() + b.getRamMb())
-          .setDiskMb(a.getDiskMb() + b.getDiskMb()));
-    }
-
     /**
-     * This function calculates max aggregate production resources consumed by the job update
+     * This function calculates max aggregate resources consumed by the job update
      * {@code instructions}. The max is calculated between existing and desired task configs
on per
      * resource basis. This means max CPU, RAM and DISK values are computed individually
and may
      * come from different task configurations. While it may not be the most accurate
      * representation of job update resources during the update, it does guarantee none of
the
      * individual resource values is exceeded during the forward/back roll.
      *
+     * NOTE: In case of a job update converting the job production bit (i.e. prod -> non-prod
or
+     *       non-prod -> prod), only the matching state is counted towards consumption.
For example,
+     *       prod -> non-prod AND {@code prodConsumption=True}: only the initial state
is accounted.
+     *
      * @param instructions Update instructions with resource definitions.
+     * @param isProd Flag indicating whether the prod or non-prod calculation requested.
      * @return Resources consumed by the update.
      */
-    private static IResourceAggregate toProdResources(IJobUpdateInstructions instructions)
{
-      double existingCpu = 0;
-      int existingRamMb = 0;
-      int existingDiskMb = 0;
+    private static IResourceAggregate instructionsToResources(
+        IJobUpdateInstructions instructions,
+        final boolean isProd) {
+
+      // Calculate initial state consumption.
+      IResourceAggregate initial = ResourceAggregates.EMPTY;
       for (IInstanceTaskConfig group : instructions.getInitialState()) {
         ITaskConfig task = group.getTask();
-        if (task.isProduction()) {
+        if (isProd == task.isProduction()) {
           for (IRange range : group.getInstances()) {
-            int numInstances = range.getLast() - range.getFirst() + 1;
-            existingCpu += task.getNumCpus() * numInstances;
-            existingRamMb += task.getRamMb() * numInstances;
-            existingDiskMb += task.getDiskMb() * numInstances;
+            initial = add(initial, ResourceAggregates.scale(
+                fromTasks(ImmutableSet.of(task)),
+                instanceCountFromRange(range)));
           }
         }
       }
 
-      // Calculate desired prod task consumption.
+      // Calculate desired state consumption.
       IResourceAggregate desired = Optional.fromNullable(instructions.getDesiredState())
-          .transform(TO_PROD_RESOURCES).or(ResourceAggregates.EMPTY);
+          .transform(new Function<IInstanceTaskConfig, IResourceAggregate>() {
+            @Override
+            public IResourceAggregate apply(IInstanceTaskConfig input) {
+              return isProd == input.getTask().isProduction()
+                  ? ResourceAggregates.scale(
+                  fromTasks(ImmutableSet.of(input.getTask())),
+                  getUpdateInstanceCount(input.getInstances()))
+                  : ResourceAggregates.EMPTY;
+            }
+          }).or(ResourceAggregates.EMPTY);
+
+      // Calculate result as max(existing, desired) per resource type.
+      return max(initial, desired);
+    }
 
-      // Calculate result as max(existing, desired) per resource.
+    private static IResourceAggregate add(IResourceAggregate a, IResourceAggregate b) {
       return IResourceAggregate.build(new ResourceAggregate()
-          .setNumCpus(Math.max(existingCpu, desired.getNumCpus()))
-          .setRamMb(Math.max(existingRamMb, desired.getRamMb()))
-          .setDiskMb(Math.max(existingDiskMb, desired.getDiskMb())));
+          .setNumCpus(a.getNumCpus() + b.getNumCpus())
+          .setRamMb(a.getRamMb() + b.getRamMb())
+          .setDiskMb(a.getDiskMb() + b.getDiskMb()));
     }
 
-    private static IResourceAggregate prodResourcesFromTasks(Iterable<ITaskConfig>
tasks) {
-      return fromTasks(FluentIterable.from(tasks).filter(Tasks.IS_PRODUCTION));
+    private static IResourceAggregate max(IResourceAggregate a, IResourceAggregate b) {
+      return IResourceAggregate.build(new ResourceAggregate()
+          .setNumCpus(Math.max(a.getNumCpus(), b.getNumCpus()))
+          .setRamMb(Math.max(a.getRamMb(), b.getRamMb()))
+          .setDiskMb(Math.max(a.getDiskMb(), b.getDiskMb())));
     }
 
     private static IResourceAggregate fromTasks(Iterable<ITaskConfig> tasks) {
@@ -368,18 +454,6 @@ public interface QuotaManager {
           .setDiskMb(diskMb));
     }
 
-    private static final Function<IInstanceTaskConfig, IResourceAggregate> TO_PROD_RESOURCES
=
-        new Function<IInstanceTaskConfig, IResourceAggregate>() {
-          @Override
-          public IResourceAggregate apply(IInstanceTaskConfig input) {
-            return input.getTask().isProduction()
-                ? ResourceAggregates.scale(
-                prodResourcesFromTasks(ImmutableSet.of(input.getTask())),
-                getUpdateInstanceCount(input.getInstances()))
-                : ResourceAggregates.EMPTY;
-          }
-        };
-
     private static final Function<IJobUpdate, IJobKey> UPDATE_TO_JOB_KEY =
         new Function<IJobUpdate, IJobKey>() {
           @Override
@@ -391,10 +465,14 @@ public interface QuotaManager {
     private static int getUpdateInstanceCount(Set<IRange> ranges) {
       int instanceCount = 0;
       for (IRange range : ranges) {
-        instanceCount += range.getLast() - range.getFirst() + 1;
+        instanceCount += instanceCountFromRange(range);
       }
 
       return instanceCount;
     }
+
+    private static int instanceCountFromRange(IRange range) {
+      return range.getLast() - range.getFirst() + 1;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/37cb0633/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
index 2fe2574..4c4482c 100644
--- a/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
@@ -24,6 +24,7 @@ import com.twitter.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.InstanceTaskConfig;
+import org.apache.aurora.gen.JobConfiguration;
 import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.JobUpdate;
 import org.apache.aurora.gen.JobUpdateInstructions;
@@ -34,10 +35,11 @@ import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.ResourceAggregates;
+import org.apache.aurora.scheduler.cron.CronJobManager;
 import org.apache.aurora.scheduler.quota.QuotaManager.QuotaException;
 import org.apache.aurora.scheduler.quota.QuotaManager.QuotaManagerImpl;
 import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
@@ -69,12 +71,14 @@ public class QuotaManagerImplTest extends EasyMockTest {
   private StorageTestUtil storageUtil;
   private JobUpdateStore jobUpdateStore;
   private QuotaManagerImpl quotaManager;
+  private CronJobManager cronJobManager;
 
   @Before
   public void setUp() throws Exception {
     storageUtil = new StorageTestUtil(this);
     jobUpdateStore = storageUtil.jobUpdateStore;
-    quotaManager = new QuotaManagerImpl(storageUtil.storage);
+    cronJobManager = createMock(CronJobManager.class);
+    quotaManager = new QuotaManagerImpl(storageUtil.storage, cronJobManager);
     storageUtil.expectOperations();
   }
 
@@ -87,16 +91,35 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectQuota(quota);
     expectTasks(prodTask, nonProdTask);
     expectJobUpdates(taskConfig(1, 1, 1, true), taskConfig(1, 1, 1, true));
+    expectCronJobs(
+        createJob(createProdTask("pc", 1, 1, 1), 2),
+        createJob(createNonProdTask("npc", 7, 7, 7), 1));
 
     control.replay();
 
-    QuotaInfo quotaInfo = quotaManager.getQuotaInfo(ROLE);
-    assertEquals(quota, quotaInfo.getQuota());
     assertEquals(
-        IResourceAggregate.build(new ResourceAggregate(4, 4, 4)), quotaInfo.getProdConsumption());
+        new QuotaInfo(from(4, 4, 4), from(6, 6, 6), from(9, 9, 9)),
+        quotaManager.getQuotaInfo(ROLE));
+  }
+
+  @Test
+  public void testGetQuotaInfoWithCronTasks() {
+    IScheduledTask prodTask = createProdTask("pc", 6, 6, 6);
+    IScheduledTask nonProdTask = createProdTask("npc", 7, 7, 7);
+    IResourceAggregate quota = IResourceAggregate.build(new ResourceAggregate(4, 4, 4));
+
+    expectQuota(quota);
+    expectTasks(prodTask, nonProdTask);
+    expectJobUpdates(taskConfig(1, 1, 1, true), taskConfig(1, 1, 1, true));
+    expectCronJobs(
+        createJob(createProdTask("pc", 3, 3, 3), 1),
+        createJob(createNonProdTask("npc", 5, 5, 5), 2));
+
+    control.replay();
+
     assertEquals(
-        IResourceAggregate.build(new ResourceAggregate(2, 2, 2)),
-        quotaInfo.getNonProdConsumption());
+        new QuotaInfo(from(4, 4, 4), from(7, 7, 7), from(10, 10, 10)),
+        quotaManager.getQuotaInfo(ROLE));
   }
 
   @Test
@@ -110,35 +133,30 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectQuota(quota);
     expectTasks(prodTask, updatingProdTask, updatingFilteredProdTask, nonProdTask);
     expectJobUpdates(taskConfig(1, 1, 1, true), taskConfig(1, 1, 1, true));
+    expectNoCronJobs();
 
     control.replay();
 
-    QuotaInfo quotaInfo = quotaManager.getQuotaInfo(ROLE);
-    assertEquals(quota, quotaInfo.getQuota());
-
     // Expected consumption from: prodTask + updatingProdTask + job update.
     assertEquals(
-        IResourceAggregate.build(new ResourceAggregate(7, 7, 7)), quotaInfo.getProdConsumption());
-
-    assertEquals(
-        IResourceAggregate.build(new ResourceAggregate(2, 2, 2)),
-        quotaInfo.getNonProdConsumption());
+        new QuotaInfo(from(4, 4, 4), from(7, 7, 7), from(2, 2, 2)),
+        quotaManager.getQuotaInfo(ROLE));
   }
 
   @Test
-  public void testGetQuotaInfoNoTasksNoUpdates() {
+  public void testGetQuotaInfoNoTasksNoUpdatesNoCronJobs() {
     IResourceAggregate quota = IResourceAggregate.build(new ResourceAggregate(4, 4, 4));
 
     expectQuota(quota);
     expectNoTasks();
     expectNoJobUpdates();
+    expectNoCronJobs();
 
     control.replay();
 
-    QuotaInfo quotaInfo = quotaManager.getQuotaInfo(ROLE);
-    assertEquals(quota, quotaInfo.getQuota());
-    assertEquals(ResourceAggregates.none(), quotaInfo.getProdConsumption());
-    assertEquals(ResourceAggregates.none(), quotaInfo.getNonProdConsumption());
+    assertEquals(
+        new QuotaInfo(from(4, 4, 4), from(0, 0, 0), from(0, 0, 0)),
+        quotaManager.getQuotaInfo(ROLE));
   }
 
   @Test
@@ -146,6 +164,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectQuota(IResourceAggregate.build(new ResourceAggregate(4, 4, 4)));
     expectTasks(createProdTask("foo", 2, 2, 2));
     expectJobUpdates(taskConfig(1, 1, 1, true), taskConfig(1, 1, 1, true));
+    expectNoCronJobs();
 
     control.replay();
 
@@ -158,6 +177,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectQuota(IResourceAggregate.build(new ResourceAggregate(4, 4, 4)));
     expectNoTasks();
     expectJobUpdates(taskConfig(1, 1, 1, true), taskConfig(1, 1, 1, true));
+    expectNoCronJobs();
 
     control.replay();
 
@@ -170,6 +190,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectQuota(IResourceAggregate.build(new ResourceAggregate(4, 4, 4)));
     expectTasks(createProdTask("foo", 2, 2, 2));
     expectNoJobUpdates();
+    expectNoCronJobs();
 
     control.replay();
 
@@ -182,6 +203,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectQuota(IResourceAggregate.build(new ResourceAggregate(4, 4, 4)));
     expectNoTasks();
     expectNoJobUpdates();
+    expectNoCronJobs();
 
     control.replay();
 
@@ -195,6 +217,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectTasks(createProdTask("foo", 2, 2, 2), createTask("bar", "id2", 5, 5, 5, false,
0));
 
     expectNoJobUpdates();
+    expectNoCronJobs();
 
     control.replay();
 
@@ -217,6 +240,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     expectNoTasks();
     expectNoJobUpdates();
+    expectNoCronJobs();
 
     control.replay();
 
@@ -229,6 +253,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectQuota(IResourceAggregate.build(new ResourceAggregate(4, 4, 4)));
     expectTasks(createProdTask("foo", 3, 3, 3));
     expectNoJobUpdates();
+    expectNoCronJobs();
 
     control.replay();
 
@@ -242,6 +267,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectQuota(IResourceAggregate.build(new ResourceAggregate(4, 4, 4)));
     expectTasks(createProdTask("foo", 3, 3, 3));
     expectNoJobUpdates();
+    expectNoCronJobs();
 
     control.replay();
 
@@ -255,6 +281,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectQuota(IResourceAggregate.build(new ResourceAggregate(4, 4, 4)));
     expectTasks(createProdTask("foo", 3, 3, 3));
     expectNoJobUpdates();
+    expectNoCronJobs();
 
     control.replay();
 
@@ -264,20 +291,39 @@ public class QuotaManagerImplTest extends EasyMockTest {
   }
 
   @Test
+  public void testCheckQuotaExceedsCron() {
+    expectQuota(IResourceAggregate.build(new ResourceAggregate(5, 5, 5))).times(2);
+    expectNoTasks().times(2);
+    expectNoJobUpdates().times(2);
+    expectCronJobs(
+        createJob(createProdTask("pc", 4, 4, 4), 1),
+        createJob(createNonProdTask("npc", 7, 7, 7), 1)).times(2);
+
+    control.replay();
+
+    QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(2, 2, 2,
true), 1);
+    assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
+    assertEquals(
+        new QuotaInfo(from(5, 5, 5), from(4, 4, 4), from(7, 7, 7)),
+        quotaManager.getQuotaInfo(ROLE));
+  }
+
+  @Test
   public void testCheckQuotaUpdatingTasksFilteredOut() {
     expectQuota(IResourceAggregate.build(new ResourceAggregate(5, 5, 5))).times(2);
     expectTasks(createProdTask("foo", 2, 2, 2), createTask(JOB_NAME, "id2", 3, 3, 3, true,
0))
         .times(2);
 
     expectJobUpdates(taskConfig(1, 1, 1, true), taskConfig(2, 2, 2, true), 2);
+    expectNoCronJobs().times(2);
 
     control.replay();
 
     QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1,
true), 1);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        IResourceAggregate.build(new ResourceAggregate(4, 4, 4)),
-        quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+        new QuotaInfo(from(5, 5, 5), from(4, 4, 4), from(0, 0, 0)),
+        quotaManager.getQuotaInfo(ROLE));
   }
 
   @Test
@@ -286,14 +332,15 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectTasks(createProdTask("foo", 2, 2, 2), createProdTask("bar", 2, 2, 2)).times(2);
 
     expectJobUpdates(taskConfig(8, 8, 8, false), taskConfig(4, 4, 4, false), 2);
+    expectNoCronJobs().times(2);
 
     control.replay();
 
     QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1,
true), 1);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        IResourceAggregate.build(new ResourceAggregate(4, 4, 4)),
-        quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+        new QuotaInfo(from(5, 5, 5), from(4, 4, 4), from(8, 8, 8)),
+        quotaManager.getQuotaInfo(ROLE));
   }
 
   @Test
@@ -301,15 +348,16 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectQuota(IResourceAggregate.build(new ResourceAggregate(5, 5, 5))).times(2);
     expectTasks(createProdTask("foo", 2, 2, 2), createProdTask("bar", 1, 1, 1)).times(2);
 
-    expectJobUpdates(taskConfig(1, 1, 1, true), taskConfig(4, 4, 4, false), 2);
+    expectJobUpdates(taskConfig(1, 1, 1, true), taskConfig(7, 7, 7, false), 2);
+    expectNoCronJobs().times(2);
 
     control.replay();
 
     QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1,
true), 1);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        IResourceAggregate.build(new ResourceAggregate(4, 4, 4)),
-        quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+        new QuotaInfo(from(5, 5, 5), from(4, 4, 4), from(7, 7, 7)),
+        quotaManager.getQuotaInfo(ROLE));
   }
 
   @Test
@@ -318,14 +366,15 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectTasks(createProdTask("foo", 2, 2, 2), createProdTask("bar", 2, 2, 2)).times(2);
 
     expectJobUpdates(taskConfig(1, 1, 1, false), taskConfig(1, 1, 1, true), 2);
+    expectNoCronJobs().times(2);
 
     control.replay();
 
     QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1,
true), 1);
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        IResourceAggregate.build(new ResourceAggregate(5, 5, 5)),
-        quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+        new QuotaInfo(from(5, 5, 5), from(5, 5, 5), from(1, 1, 1)),
+        quotaManager.getQuotaInfo(ROLE));
   }
 
   @Test
@@ -333,14 +382,15 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectQuota(IResourceAggregate.build(new ResourceAggregate(6, 6, 6))).times(2);
     expectTasks(createProdTask("foo", 2, 2, 2), createProdTask("bar", 2, 2, 2)).times(2);
     expectJobUpdates(taskConfig(2, 2, 2, true), taskConfig(1, 1, 1, true), 2);
+    expectNoCronJobs().times(2);
 
     control.replay();
 
     QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1,
true), 1);
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        IResourceAggregate.build(new ResourceAggregate(6, 6, 6)),
-        quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+        new QuotaInfo(from(6, 6, 6), from(6, 6, 6), from(0, 0, 0)),
+        quotaManager.getQuotaInfo(ROLE));
   }
 
   @Test
@@ -348,14 +398,15 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectQuota(IResourceAggregate.build(new ResourceAggregate(6, 6, 6))).times(2);
     expectTasks(createProdTask("foo", 2, 2, 2), createProdTask("bar", 2, 2, 2)).times(2);
     expectJobUpdates(taskConfig(1, 1, 1, true), 1, taskConfig(1, 1, 1, true), 2, 2);
+    expectNoCronJobs().times(2);
 
     control.replay();
 
     QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1,
true), 1);
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        IResourceAggregate.build(new ResourceAggregate(6, 6, 6)),
-        quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+        new QuotaInfo(from(6, 6, 6), from(6, 6, 6), from(0, 0, 0)),
+        quotaManager.getQuotaInfo(ROLE));
   }
 
   @Test
@@ -363,14 +414,15 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectQuota(IResourceAggregate.build(new ResourceAggregate(6, 6, 6))).times(2);
     expectTasks(createProdTask("foo", 2, 2, 2), createProdTask("bar", 2, 2, 2)).times(2);
     expectJobUpdates(taskConfig(1, 1, 1, true), 2, taskConfig(1, 1, 1, true), 1, 2);
+    expectNoCronJobs().times(2);
 
     control.replay();
 
     QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1,
true), 1);
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        IResourceAggregate.build(new ResourceAggregate(6, 6, 6)),
-        quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+        new QuotaInfo(from(6, 6, 6), from(6, 6, 6), from(0, 0, 0)),
+        quotaManager.getQuotaInfo(ROLE));
   }
 
   @Test
@@ -391,13 +443,15 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expect(jobUpdateStore.fetchJobUpdate(updateId))
         .andReturn(Optional.of(IJobUpdate.build(builder))).times(2);
 
+    expectNoCronJobs().times(2);
+
     control.replay();
 
     QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1,
true), 1);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        IResourceAggregate.build(new ResourceAggregate(4, 4, 4)),
-        quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+        new QuotaInfo(from(6, 6, 6), from(4, 4, 4), from(0, 0, 0)),
+        quotaManager.getQuotaInfo(ROLE));
   }
 
   @Test
@@ -418,13 +472,15 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expect(jobUpdateStore.fetchJobUpdate(updateId))
         .andReturn(Optional.of(IJobUpdate.build(builder))).times(2);
 
+    expectNoCronJobs().times(2);
+
     control.replay();
 
     QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1,
true), 1);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        IResourceAggregate.build(new ResourceAggregate(4, 4, 4)),
-        quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+        new QuotaInfo(from(6, 6, 6), from(4, 4, 4), from(0, 0, 0)),
+        quotaManager.getQuotaInfo(ROLE));
   }
 
   @Test
@@ -445,13 +501,15 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expect(jobUpdateStore.fetchJobUpdate(updateId))
         .andReturn(Optional.of(IJobUpdate.build(builder))).times(2);
 
+    expectNoCronJobs().times(2);
+
     control.replay();
 
     QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1,
true), 1);
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        IResourceAggregate.build(new ResourceAggregate(6, 6, 6)),
-        quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+        new QuotaInfo(from(6, 6, 6), from(6, 6, 6), from(0, 0, 0)),
+        quotaManager.getQuotaInfo(ROLE));
   }
 
   @Test
@@ -471,13 +529,15 @@ public class QuotaManagerImplTest extends EasyMockTest {
         config,
         1);
 
+    expectNoCronJobs().times(2);
+
     control.replay();
 
     QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        IResourceAggregate.build(new ResourceAggregate(6, 6, 6)),
-        quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+        new QuotaInfo(from(6, 6, 6), from(6, 6, 6), from(0, 0, 0)),
+        quotaManager.getQuotaInfo(ROLE));
   }
 
   @Test
@@ -494,13 +554,15 @@ public class QuotaManagerImplTest extends EasyMockTest {
         config,
         3);
 
+    expectNoCronJobs().times(2);
+
     control.replay();
 
     QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update);
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        IResourceAggregate.build(new ResourceAggregate(4, 4, 4)),
-        quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+        new QuotaInfo(from(6, 6, 6), from(4, 4, 4), from(0, 0, 0)),
+        quotaManager.getQuotaInfo(ROLE));
   }
 
   @Test
@@ -520,13 +582,15 @@ public class QuotaManagerImplTest extends EasyMockTest {
         config,
         1);
 
+    expectNoCronJobs().times(2);
+
     control.replay();
 
     QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        IResourceAggregate.build(new ResourceAggregate(6, 6, 6)),
-        quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+        new QuotaInfo(from(6, 6, 6), from(6, 6, 6), from(0, 0, 0)),
+        quotaManager.getQuotaInfo(ROLE));
   }
 
   @Test
@@ -648,6 +712,19 @@ public class QuotaManagerImplTest extends EasyMockTest {
     return expectTasks();
   }
 
+  private IExpectationSetters<?> expectNoCronJobs() {
+    return expect(cronJobManager.getJobs()).andReturn(ImmutableSet.<IJobConfiguration>of());
+  }
+
+  private IExpectationSetters<?> expectCronJobs(IJobConfiguration... jobs) {
+    ImmutableSet.Builder<IJobConfiguration> builder = ImmutableSet.builder();
+    for (IJobConfiguration job : jobs) {
+      builder.add(job);
+    }
+
+    return expect(cronJobManager.getJobs()).andReturn(builder.build());
+  }
+
   private IExpectationSetters<Optional<IResourceAggregate>> expectQuota(IResourceAggregate
quota) {
     return expect(storageUtil.quotaStore.fetchQuota(ROLE))
         .andReturn(Optional.of(quota));
@@ -663,6 +740,10 @@ public class QuotaManagerImplTest extends EasyMockTest {
     return createTask(jobName, jobName + "id1", cpus, ramMb, diskMb, true, 0);
   }
 
+  private IScheduledTask createNonProdTask(String jobName, int cpus, int ramMb, int diskMb)
{
+    return createTask(jobName, jobName + "id1", cpus, ramMb, diskMb, false, 0);
+  }
+
   private IScheduledTask createTask(
       String jobName,
       String taskId,
@@ -688,4 +769,16 @@ public class QuotaManagerImplTest extends EasyMockTest {
                     .setDiskMb(diskMb)
                     .setProduction(production))));
   }
+
+  private IJobConfiguration createJob(IScheduledTask scheduledTask, int instanceCount) {
+    TaskConfig task = scheduledTask.newBuilder().getAssignedTask().getTask();
+    return IJobConfiguration.build(new JobConfiguration()
+        .setKey(task.getJob())
+        .setTaskConfig(task)
+        .setInstanceCount(instanceCount));
+  }
+
+  private static IResourceAggregate from(double cpu, int ramMb, int diskMb) {
+    return IResourceAggregate.build(new ResourceAggregate(cpu, ramMb, diskMb));
+  }
 }


Mime
View raw message