aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject aurora git commit: Refine types used in QuotaManager, share more functions/predicates.
Date Wed, 01 Apr 2015 17:24:34 GMT
Repository: aurora
Updated Branches:
  refs/heads/master 14e7b84f4 -> 4b9c759cf


Refine types used in QuotaManager, share more functions/predicates.

Reviewed at https://reviews.apache.org/r/32371/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/4b9c759c
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/4b9c759c
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/4b9c759c

Branch: refs/heads/master
Commit: 4b9c759cf3868b1b89e5411fd7ed782d2e5f81e0
Parents: 14e7b84
Author: Bill Farner <wfarner@apache.org>
Authored: Wed Apr 1 10:22:18 2015 -0700
Committer: Bill Farner <wfarner@apache.org>
Committed: Wed Apr 1 10:22:18 2015 -0700

----------------------------------------------------------------------
 .../aurora/scheduler/quota/QuotaManager.java    | 306 ++++++++++---------
 .../aurora/scheduler/updater/UpdateFactory.java |  10 +-
 .../aurora/scheduler/updater/Updates.java       |  22 ++
 3 files changed, 183 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/4b9c759c/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
index 39e930c..7453680 100644
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
@@ -13,7 +13,7 @@
  */
 package org.apache.aurora.scheduler.quota;
 
-import java.util.List;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.Set;
 
@@ -24,13 +24,11 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableRangeSet;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
-import com.google.common.collect.Range;
 import com.google.common.collect.RangeSet;
-import com.google.common.collect.Sets;
 
 import org.apache.aurora.gen.JobUpdateQuery;
 import org.apache.aurora.gen.ResourceAggregate;
@@ -40,6 +38,7 @@ import org.apache.aurora.scheduler.base.ResourceAggregates;
 import org.apache.aurora.scheduler.storage.JobUpdateStore;
 import org.apache.aurora.scheduler.storage.QuotaStore;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
@@ -49,17 +48,19 @@ import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
 import org.apache.aurora.scheduler.storage.entities.IRange;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.updater.Updates;
 
 import static java.util.Objects.requireNonNull;
 
 import static org.apache.aurora.scheduler.base.ResourceAggregates.EMPTY;
+import static org.apache.aurora.scheduler.base.Tasks.ASSIGNED_TO_INFO;
+import static org.apache.aurora.scheduler.base.Tasks.ASSIGNED_TO_JOB_KEY;
 import static org.apache.aurora.scheduler.base.Tasks.INFO_TO_JOB_KEY;
 import static org.apache.aurora.scheduler.base.Tasks.IS_PRODUCTION;
-import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_INFO;
+import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED;
 import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.SUFFICIENT_QUOTA;
+import static org.apache.aurora.scheduler.updater.Updates.getInstanceIds;
 
 /**
  * Allows access to resource quotas, and tracks quota consumption.
@@ -81,7 +82,7 @@ public interface QuotaManager {
    *
    * @param role Quota owner.
    * @param storeProvider A store provider to access quota data.
-   * @return {@code QuotaInfo} instance.
+   * @return quota usage information for the given role.
    */
   QuotaInfo getQuotaInfo(String role, StoreProvider storeProvider);
 
@@ -93,7 +94,7 @@ public interface QuotaManager {
    * @param template Task resource requirement.
    * @param instances Number of additional instances requested.
    * @param storeProvider A store provider to access quota data.
-   * @return {@code QuotaComparisonResult} instance with quota check result details.
+   * @return quota check result details.
    */
   QuotaCheckResult checkInstanceAddition(
       ITaskConfig template,
@@ -106,7 +107,7 @@ public interface QuotaManager {
    *
    * @param jobUpdate Job update to check quota for.
    * @param storeProvider A store provider to access quota data.
-   * @return {@code QuotaComparisonResult} instance with quota check result details.
+   * @return quota check result details.
    */
   QuotaCheckResult checkJobUpdate(IJobUpdate jobUpdate, StoreProvider storeProvider);
 
@@ -116,7 +117,7 @@ public interface QuotaManager {
    *
    * @param cronConfig Cron job configuration.
    * @param storeProvider A store provider to access quota data.
-   * @return{@code QuotaComparisonResult} instance with quota check result details.
+   * @return quota check result details.
    */
   QuotaCheckResult checkCronUpdate(IJobConfiguration cronConfig, StoreProvider storeProvider);
 
@@ -233,18 +234,20 @@ public interface QuotaManager {
         Optional<IJobUpdate> requestedUpdate,
         StoreProvider storeProvider) {
 
-      FluentIterable<IScheduledTask> tasks = FluentIterable.from(
-          storeProvider.getTaskStore().fetchTasks(Query.roleScoped(role).active()));
+      FluentIterable<IAssignedTask> tasks = FluentIterable
+          .from(storeProvider.getTaskStore().fetchTasks(Query.roleScoped(role).active()))
+          .transform(SCHEDULED_TO_ASSIGNED);
 
-      Map<IJobKey, IJobUpdate> updates = Maps.newHashMap(
-          fetchActiveJobUpdates(storeProvider.getJobUpdateStore(), role)
-              .uniqueIndex(UPDATE_TO_JOB_KEY));
+      Map<IJobKey, IJobUpdateInstructions> updates = Maps.newHashMap(
+          fetchActiveJobUpdates(storeProvider.getJobUpdateStore(), role));
 
       // Mix in a requested job update (if present) to correctly calculate consumption.
       // This would be an update that is not saved in the store yet (i.e. the one quota is
       // checked for).
       if (requestedUpdate.isPresent()) {
-        updates.put(requestedUpdate.get().getSummary().getKey().getJob(), requestedUpdate.get());
+        updates.put(
+            requestedUpdate.get().getSummary().getKey().getJob(),
+            requestedUpdate.get().getInstructions());
       }
 
       Map<IJobKey, IJobConfiguration> cronTemplates =
@@ -262,32 +265,47 @@ public interface QuotaManager {
       return new QuotaInfo(quota, prodConsumed, nonProdConsumed);
     }
 
+    private static final Function<IJobConfiguration, ITaskConfig> JOB_TO_TASK =
+        new Function<IJobConfiguration, ITaskConfig>() {
+          @Override
+          public ITaskConfig apply(IJobConfiguration job) {
+            return job.getTaskConfig();
+          }
+        };
+
     private IResourceAggregate getConsumption(
-        FluentIterable<IScheduledTask> tasks,
-        Map<IJobKey, IJobUpdate> updatesByKey,
+        FluentIterable<IAssignedTask> tasks,
+        Map<IJobKey, IJobUpdateInstructions> updatesByKey,
         Map<IJobKey, IJobConfiguration> cronTemplatesByKey,
         boolean isProd) {
 
       Predicate<ITaskConfig> prodFilter = isProd ? IS_PRODUCTION : Predicates.not(IS_PRODUCTION);
 
-      FluentIterable<IScheduledTask> filteredTasks =
-          tasks.filter(Predicates.compose(prodFilter, SCHEDULED_TO_INFO));
+      FluentIterable<IAssignedTask> filteredTasks =
+          tasks.filter(Predicates.compose(prodFilter, ASSIGNED_TO_INFO));
+
+      Predicate<IAssignedTask> excludeCron = Predicates.compose(
+          Predicates.not(Predicates.in(cronTemplatesByKey.keySet())),
+          ASSIGNED_TO_JOB_KEY);
 
       IResourceAggregate nonCronConsumption = getNonCronConsumption(
           updatesByKey,
-          excludeCronTasks(filteredTasks, cronTemplatesByKey),
-          isProd);
+          filteredTasks.filter(excludeCron),
+          prodFilter);
 
-      IResourceAggregate cronConsumption =
-          getCronConsumption(cronTemplatesByKey, filteredTasks, isProd);
+      IResourceAggregate cronConsumption = getCronConsumption(
+          Iterables.filter(
+              cronTemplatesByKey.values(),
+              Predicates.compose(prodFilter, JOB_TO_TASK)),
+          filteredTasks.transform(ASSIGNED_TO_INFO));
 
       return add(nonCronConsumption, cronConsumption);
     }
 
     private static IResourceAggregate getNonCronConsumption(
-        Map<IJobKey, IJobUpdate> updatesByKey,
-        FluentIterable<IScheduledTask> tasks,
-        boolean isProd) {
+        Map<IJobKey, IJobUpdateInstructions> updatesByKey,
+        FluentIterable<IAssignedTask> tasks,
+        final Predicate<ITaskConfig> configFilter) {
 
       // 1. Get all active tasks that belong to jobs without active updates OR unaffected
by an
       //    active update working set. An example of the latter would be instances not updated
by
@@ -302,21 +320,20 @@ public interface QuotaManager {
 
       IResourceAggregate nonUpdateConsumption = fromTasks(tasks
           .filter(buildNonUpdatingTasksFilter(updatesByKey))
-          .transform(SCHEDULED_TO_INFO));
+          .transform(ASSIGNED_TO_INFO));
 
-      IResourceAggregate updateConsumption = EMPTY;
-      for (IJobUpdate update : updatesByKey.values()) {
-        updateConsumption =
-            add(updateConsumption, instructionsToResources(update.getInstructions(), isProd));
-      }
+      final Predicate<IInstanceTaskConfig> instanceFilter =
+          Predicates.compose(configFilter, INSTANCE_CONFIG);
+
+      IResourceAggregate updateConsumption =
+          addAll(Iterables.transform(updatesByKey.values(), updateResources(instanceFilter)));
 
       return add(nonUpdateConsumption, updateConsumption);
     }
 
     private static IResourceAggregate getCronConsumption(
-        Map<IJobKey, IJobConfiguration> cronTemplates,
-        FluentIterable<IScheduledTask> tasks,
-        boolean isProd) {
+        Iterable<IJobConfiguration> cronTemplates,
+        FluentIterable<ITaskConfig> tasks) {
 
       // Calculate the overall cron consumption as MAX between cron template resources and
active
       // cron tasks. This is required to account for a case when a running cron task has
higher
@@ -326,52 +343,36 @@ public interface QuotaManager {
       // cron scheduling, it's the simplest approach possible given the system constraints
(e.g.:
       // lack of enforcement on a cron job run duration).
 
-      Multimap<IJobKey, ITaskConfig> taskConfigsByKey =
-          tasks.transform(SCHEDULED_TO_INFO).index(INFO_TO_JOB_KEY);
-
-      IResourceAggregate totalConsumption = EMPTY;
-      for (IJobConfiguration config : cronTemplates.values()) {
-        if (isProd == config.getTaskConfig().isProduction()) {
-          IResourceAggregate templateConsumption =
-              scale(config.getTaskConfig(), config.getInstanceCount());
-
-          IResourceAggregate taskConsumption = fromTasks(taskConfigsByKey.get(config.getKey()));
-
-          totalConsumption = add(totalConsumption, max(templateConsumption, taskConsumption));
-        }
-      }
-      return totalConsumption;
-    }
-
-    private static FluentIterable<IScheduledTask> excludeCronTasks(
-        FluentIterable<IScheduledTask> tasks,
-        final Map<IJobKey, IJobConfiguration> cronJobs) {
-
-      return tasks.filter(new Predicate<IScheduledTask>() {
-        @Override
-        public boolean apply(IScheduledTask input) {
-          return !cronJobs.containsKey(input.getAssignedTask().getTask().getJob());
-        }
-      });
+      final Multimap<IJobKey, ITaskConfig> taskConfigsByKey = tasks.index(INFO_TO_JOB_KEY);
+      return addAll(Iterables.transform(
+          cronTemplates,
+          new Function<IJobConfiguration, IResourceAggregate>() {
+            @Override
+            public IResourceAggregate apply(IJobConfiguration config) {
+              return max(
+                  scale(config.getTaskConfig(), config.getInstanceCount()),
+                  fromTasks(taskConfigsByKey.get(config.getKey())));
+            }
+          }));
     }
 
-    private static Predicate<IScheduledTask> buildNonUpdatingTasksFilter(
-        final Map<IJobKey, IJobUpdate> roleJobUpdates) {
+    private static Predicate<IAssignedTask> buildNonUpdatingTasksFilter(
+        final Map<IJobKey, IJobUpdateInstructions> roleJobUpdates) {
 
-      return new Predicate<IScheduledTask>() {
+      return new Predicate<IAssignedTask>() {
         @Override
-        public boolean apply(IScheduledTask input) {
-          Optional<IJobUpdate> update = Optional.fromNullable(
-              roleJobUpdates.get(input.getAssignedTask().getTask().getJob()));
+        public boolean apply(IAssignedTask task) {
+          Optional<IJobUpdateInstructions> update = Optional.fromNullable(
+              roleJobUpdates.get(task.getTask().getJob()));
 
           if (update.isPresent()) {
-            IJobUpdateInstructions instructions = update.get().getInstructions();
-            RangeSet<Integer> initialInstances = instanceRangeSet(instructions.getInitialState());
-            RangeSet<Integer> desiredInstances = instanceRangeSet(instructions.isSetDesiredState()
+            IJobUpdateInstructions instructions = update.get();
+            RangeSet<Integer> initialInstances = getInstanceIds(instructions.getInitialState());
+            RangeSet<Integer> desiredInstances = getInstanceIds(instructions.isSetDesiredState()
                 ? ImmutableSet.of(instructions.getDesiredState())
                 : ImmutableSet.<IInstanceTaskConfig>of());
 
-            int instanceId = input.getAssignedTask().getInstanceId();
+            int instanceId = task.getInstanceId();
             return !initialInstances.contains(instanceId) && !desiredInstances.contains(instanceId);
           }
           return true;
@@ -379,18 +380,31 @@ public interface QuotaManager {
       };
     }
 
-    private static FluentIterable<IJobUpdate> fetchActiveJobUpdates(
-        JobUpdateStore jobUpdateStore,
-        String role) {
+    private static final Function<IJobUpdate, IJobUpdateInstructions> UPDATE_TO_INSTRUCTIONS
=
+        new Function<IJobUpdate, IJobUpdateInstructions>() {
+          @Override
+          public IJobUpdateInstructions apply(IJobUpdate update) {
+            return update.getInstructions();
+          }
+        };
 
-      List<IJobUpdateSummary> summaries = jobUpdateStore.fetchJobUpdateSummaries(updateQuery(role));
+    private static Map<IJobKey, IJobUpdateInstructions> fetchActiveJobUpdates(
+        final JobUpdateStore jobUpdateStore,
+        String role) {
 
-      Set<IJobUpdate> updates = Sets.newHashSet();
-      for (IJobUpdateSummary summary : summaries) {
-        updates.add(jobUpdateStore.fetchJobUpdate(summary.getKey()).get());
-      }
+      Function<IJobUpdateSummary, IJobUpdate> fetchUpdate =
+          new Function<IJobUpdateSummary, IJobUpdate>() {
+            @Override
+            public IJobUpdate apply(IJobUpdateSummary summary) {
+              return jobUpdateStore.fetchJobUpdate(summary.getKey()).get();
+            }
+          };
 
-      return FluentIterable.from(updates);
+      return Maps.transformValues(
+          FluentIterable.from(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(role)))
+              .transform(fetchUpdate)
+              .uniqueIndex(UPDATE_TO_JOB_KEY),
+          UPDATE_TO_INSTRUCTIONS);
     }
 
     @VisibleForTesting
@@ -400,68 +414,84 @@ public interface QuotaManager {
           .setUpdateStatuses(Updates.ACTIVE_JOB_UPDATE_STATES));
     }
 
-    private static RangeSet<Integer> instanceRangeSet(Set<IInstanceTaskConfig>
configs) {
-      ImmutableRangeSet.Builder<Integer> builder = ImmutableRangeSet.builder();
-      for (IInstanceTaskConfig config : configs) {
-        for (IRange range : config.getInstances()) {
-          builder.add(Range.closed(range.getFirst(), range.getLast()));
-        }
-      }
+    private static final Function<IInstanceTaskConfig, ITaskConfig> INSTANCE_CONFIG
=
+        new Function<IInstanceTaskConfig, ITaskConfig>() {
+          @Override
+          public ITaskConfig apply(IInstanceTaskConfig config) {
+            return config.getTask();
+          }
+        };
+
+    private static final Function<ITaskConfig, IResourceAggregate> CONFIG_RESOURCES
=
+        new Function<ITaskConfig, IResourceAggregate>() {
+          @Override
+          public IResourceAggregate apply(ITaskConfig config) {
+            return IResourceAggregate.build(new ResourceAggregate()
+                .setNumCpus(config.getNumCpus())
+                .setRamMb(config.getRamMb())
+                .setDiskMb(config.getDiskMb()));
+          }
+        };
+
+    private static final Function<IInstanceTaskConfig, IResourceAggregate> INSTANCE_RESOURCES
=
+        new Function<IInstanceTaskConfig, IResourceAggregate>() {
+          @Override
+          public IResourceAggregate apply(IInstanceTaskConfig config) {
+            return scale(config.getTask(), getUpdateInstanceCount(config.getInstances()));
+          }
+        };
+
+    private static IResourceAggregate instructionsToResources(
+        Iterable<IInstanceTaskConfig> instructions) {
 
-      return builder.build();
+      return addAll(FluentIterable.from(instructions).transform(INSTANCE_RESOURCES));
     }
 
     /**
-     * This function calculates max aggregate resources consumed by the job update
+     * Calculates max aggregate resources consumed by the job update
      * {@code instructions}. The max is calculated between existing and desired task configs
on per
      * resource basis. This means max CPU, RAM and DISK values are computed individually
and may
      * come from different task configurations. While it may not be the most accurate
      * representation of job update resources during the update, it does guarantee none of
the
      * individual resource values is exceeded during the forward/back roll.
-     *
+     * <p/>
      * NOTE: In case of a job update converting the job production bit (i.e. prod -> non-prod
or
      *       non-prod -> prod), only the matching state is counted towards consumption.
For example,
      *       prod -> non-prod AND {@code prodConsumption=True}: only the initial state
is accounted.
-     *
-     * @param instructions Update instructions with resource definitions.
-     * @param isProd Flag indicating whether the prod or non-prod calculation requested.
-     * @return Resources consumed by the update.
      */
-    private static IResourceAggregate instructionsToResources(
-        IJobUpdateInstructions instructions,
-        final boolean isProd) {
-
-      // Calculate initial state consumption.
-      IResourceAggregate initial = EMPTY;
-      for (IInstanceTaskConfig group : instructions.getInitialState()) {
-        ITaskConfig task = group.getTask();
-        if (isProd == task.isProduction()) {
-          for (IRange range : group.getInstances()) {
-            initial = add(initial, scale(task, instanceCountFromRange(range)));
-          }
-        }
-      }
-
-      // Calculate desired state consumption.
-      IResourceAggregate desired = Optional.fromNullable(instructions.getDesiredState())
-          .transform(new Function<IInstanceTaskConfig, IResourceAggregate>() {
-            @Override
-            public IResourceAggregate apply(IInstanceTaskConfig input) {
-              return isProd == input.getTask().isProduction()
-                  ? scale(input.getTask(), getUpdateInstanceCount(input.getInstances()))
-                  : EMPTY;
-            }
-          }).or(EMPTY);
+    private static Function<IJobUpdateInstructions, IResourceAggregate> updateResources(
+        final Predicate<IInstanceTaskConfig> instanceFilter) {
 
-      // Calculate result as max(existing, desired) per resource type.
-      return max(initial, desired);
+      return new Function<IJobUpdateInstructions, IResourceAggregate>() {
+        @Override
+        public IResourceAggregate apply(IJobUpdateInstructions instructions) {
+          Iterable<IInstanceTaskConfig> initialState =
+              Iterables.filter(instructions.getInitialState(), instanceFilter);
+          Iterable<IInstanceTaskConfig> desiredState = Iterables.filter(
+              Optional.fromNullable(instructions.getDesiredState()).asSet(),
+              instanceFilter);
+
+          // Calculate result as max(existing, desired) per resource type.
+          return max(
+              instructionsToResources(initialState),
+              instructionsToResources(desiredState));
+        }
+      };
     }
 
     private static IResourceAggregate add(IResourceAggregate a, IResourceAggregate b) {
-      return IResourceAggregate.build(new ResourceAggregate()
-          .setNumCpus(a.getNumCpus() + b.getNumCpus())
-          .setRamMb(a.getRamMb() + b.getRamMb())
-          .setDiskMb(a.getDiskMb() + b.getDiskMb()));
+      return addAll(Arrays.asList(a, b));
+    }
+
+    private static IResourceAggregate addAll(Iterable<IResourceAggregate> aggregates)
{
+      IResourceAggregate total = EMPTY;
+      for (IResourceAggregate aggregate : aggregates) {
+        total = IResourceAggregate.build(new ResourceAggregate()
+            .setNumCpus(total.getNumCpus() + aggregate.getNumCpus())
+            .setRamMb(total.getRamMb() + aggregate.getRamMb())
+            .setDiskMb(total.getDiskMb() + aggregate.getDiskMb()));
+      }
+      return total;
     }
 
     private static IResourceAggregate subtract(IResourceAggregate a, IResourceAggregate b)
{
@@ -479,7 +509,7 @@ public interface QuotaManager {
     }
 
     private static IResourceAggregate scale(ITaskConfig taskConfig, int instanceCount) {
-      return ResourceAggregates.scale(fromTasks(ImmutableSet.of(taskConfig)), instanceCount);
+      return ResourceAggregates.scale(CONFIG_RESOURCES.apply(taskConfig), instanceCount);
     }
 
     private static IResourceAggregate scale(IJobConfiguration jobConfiguration) {
@@ -487,19 +517,7 @@ public interface QuotaManager {
     }
 
     private static IResourceAggregate fromTasks(Iterable<ITaskConfig> tasks) {
-      double cpu = 0;
-      int ramMb = 0;
-      int diskMb = 0;
-      for (ITaskConfig task : tasks) {
-        cpu += task.getNumCpus();
-        ramMb += task.getRamMb();
-        diskMb += task.getDiskMb();
-      }
-
-      return IResourceAggregate.build(new ResourceAggregate()
-          .setNumCpus(cpu)
-          .setRamMb(ramMb)
-          .setDiskMb(diskMb));
+      return addAll(Iterables.transform(tasks, CONFIG_RESOURCES));
     }
 
     private static final Function<IJobUpdate, IJobKey> UPDATE_TO_JOB_KEY =
@@ -513,14 +531,10 @@ public interface QuotaManager {
     private static int getUpdateInstanceCount(Set<IRange> ranges) {
       int instanceCount = 0;
       for (IRange range : ranges) {
-        instanceCount += instanceCountFromRange(range);
+        instanceCount += range.getLast() - range.getFirst() + 1;
       }
 
       return instanceCount;
     }
-
-    private static int instanceCountFromRange(IRange range) {
-      return range.getLast() - range.getFirst() + 1;
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4b9c759c/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java b/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java
index b530861..b87ae4e 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java
@@ -19,7 +19,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.collect.DiscreteDomain;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableRangeSet;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
@@ -147,14 +146,7 @@ interface UpdateFactory {
 
     @VisibleForTesting
     static Set<Integer> expandInstanceIds(Set<IInstanceTaskConfig> instanceGroups)
{
-      ImmutableRangeSet.Builder<Integer> instanceIds = ImmutableRangeSet.builder();
-      for (IInstanceTaskConfig group : instanceGroups) {
-        for (IRange range : group.getInstances()) {
-          instanceIds.add(toRange(range));
-        }
-      }
-
-      return instanceIds.build().asSet(DiscreteDomain.integers());
+      return Updates.getInstanceIds(instanceGroups).asSet(DiscreteDomain.integers());
     }
 
     private static Optional<ITaskConfig> getConfig(

http://git-wip-us.apache.org/repos/asf/aurora/blob/4b9c759c/src/main/java/org/apache/aurora/scheduler/updater/Updates.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/Updates.java b/src/main/java/org/apache/aurora/scheduler/updater/Updates.java
index 776278c..6466473 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/Updates.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/Updates.java
@@ -15,13 +15,17 @@ package org.apache.aurora.scheduler.updater;
 
 import java.util.Set;
 
+import com.google.common.collect.ImmutableRangeSet;
+import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
 
 import org.apache.aurora.gen.JobUpdateKey;
 import org.apache.aurora.gen.JobUpdateStatus;
 import org.apache.aurora.gen.JobUpdateSummary;
 import org.apache.aurora.gen.apiConstants;
+import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
+import org.apache.aurora.scheduler.storage.entities.IRange;
 
 /**
  * Utility functions for job updates.
@@ -53,4 +57,22 @@ public final class Updates {
       return IJobUpdateSummary.build(mutableSummary);
     }
   }
+
+  /**
+   * Creates a range set representing all instance IDs represented by a set of instance
+   * configurations included in a job update.
+   *
+   * @param configs Job update components.
+   * @return A range set representing the instance IDs mentioned in instance groupings.
+   */
+  public static ImmutableRangeSet<Integer> getInstanceIds(Set<IInstanceTaskConfig>
configs) {
+    ImmutableRangeSet.Builder<Integer> builder = ImmutableRangeSet.builder();
+    for (IInstanceTaskConfig config : configs) {
+      for (IRange range : config.getInstances()) {
+        builder.add(Range.closed(range.getFirst(), range.getLast()));
+      }
+    }
+
+    return builder.build();
+  }
 }


Mime
View raw message