aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject aurora git commit: Re-purposing addInstances RPC to act as scaleOut
Date Wed, 27 Jan 2016 08:10:49 GMT
Repository: aurora
Updated Branches:
  refs/heads/master 9faec2f05 -> 0d316cfbe


Re-purposing addInstances RPC to act as scaleOut

Bugs closed: AURORA-1258

Reviewed at https://reviews.apache.org/r/42759/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/0d316cfb
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/0d316cfb
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/0d316cfb

Branch: refs/heads/master
Commit: 0d316cfbe39d3ac17629540550af097cd44992f0
Parents: 9faec2f
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Wed Jan 27 00:07:31 2016 -0800
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Wed Jan 27 00:07:31 2016 -0800

----------------------------------------------------------------------
 NEWS                                            |   3 +-
 .../thrift/org/apache/aurora/gen/api.thrift     |  10 +-
 .../org/apache/aurora/scheduler/base/Query.java |   4 -
 .../ShiroAuthorizingParamInterceptor.java       |   9 +-
 .../aurora/scheduler/state/LockManagerImpl.java |   2 +-
 .../thrift/SchedulerThriftInterface.java        |  78 ++++++++---
 .../thrift/aop/AnnotatedAuroraAdmin.java        |   5 +-
 .../aurora/scheduler/thrift/Fixtures.java       |   3 +
 .../thrift/SchedulerThriftInterfaceTest.java    | 135 ++++++++++++++-----
 9 files changed, 179 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/0d316cfb/NEWS
----------------------------------------------------------------------
diff --git a/NEWS b/NEWS
index e46b2cc..0e9f3b3 100644
--- a/NEWS
+++ b/NEWS
@@ -37,7 +37,8 @@
   controlled exclusively via the client with `aurora job restart --restart-threshold=[seconds]`.
 - Removed executor flag `--announcer-enable`.  Enabling the announcer previously required
both flags
   `--announcer-enable` and `--announcer-ensemble`, but now only `--announcer-ensemble` must
be set.
-
+- Deprecated `AddInstancesConfig` argument in `addInstances` thrift RPC. The new behavior
is to
+  increase job instance count (scale out) based on the task template pointed by instance
`key`.
 
 0.11.0
 ------

http://git-wip-us.apache.org/repos/asf/aurora/blob/0d316cfb/api/src/main/thrift/org/apache/aurora/gen/api.thrift
----------------------------------------------------------------------
diff --git a/api/src/main/thrift/org/apache/aurora/gen/api.thrift b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
index f0e330c..95313a0 100644
--- a/api/src/main/thrift/org/apache/aurora/gen/api.thrift
+++ b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
@@ -1027,10 +1027,14 @@ service AuroraSchedulerManager extends ReadOnlyScheduler {
   Response killTasks(1: TaskQuery query, 3: Lock lock, 4: JobKey job, 5: set<i32> instances)
 
   /**
-   * Adds new instances specified by the AddInstancesConfig. A job represented by the JobKey
must be
-   * protected by Lock.
+   * Adds new instances with the TaskConfig of the existing instance pointed by the key.
+   * TODO(maxim): remove AddInstancesConfig in AURORA-1595.
    */
-  Response addInstances(1: AddInstancesConfig config, 2: Lock lock)
+  Response addInstances(
+      1: AddInstancesConfig config,
+      2: Lock lock,
+      3: InstanceKey key,
+      4: i32 count)
 
   /**
    * Creates and saves a new Lock instance guarding against multiple mutating operations
within the

http://git-wip-us.apache.org/repos/asf/aurora/blob/0d316cfb/src/main/java/org/apache/aurora/scheduler/base/Query.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/Query.java b/src/main/java/org/apache/aurora/scheduler/base/Query.java
index 7bf0afb..fbadfd3 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/Query.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/Query.java
@@ -110,10 +110,6 @@ public final class Query {
     return unscoped().byStatus(status, statuses);
   }
 
-  public static Builder statusScoped(Iterable<ScheduleStatus> statuses) {
-    return unscoped().byStatus(statuses);
-  }
-
   /**
    * A Builder of TaskQueries. Builders are immutable and provide access to a set of convenience
    * methods to return a new builder of another scope. Available scope filters include slave,

http://git-wip-us.apache.org/repos/asf/aurora/blob/0d316cfb/src/main/java/org/apache/aurora/scheduler/http/api/security/ShiroAuthorizingParamInterceptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/api/security/ShiroAuthorizingParamInterceptor.java
b/src/main/java/org/apache/aurora/scheduler/http/api/security/ShiroAuthorizingParamInterceptor.java
index 21e565e..3043dfa 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/api/security/ShiroAuthorizingParamInterceptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/api/security/ShiroAuthorizingParamInterceptor.java
@@ -39,6 +39,7 @@ import org.aopalliance.intercept.MethodInterceptor;
 import org.aopalliance.intercept.MethodInvocation;
 import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.gen.AddInstancesConfig;
+import org.apache.aurora.gen.InstanceKey;
 import org.apache.aurora.gen.JobConfiguration;
 import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.JobUpdateKey;
@@ -139,6 +140,9 @@ class ShiroAuthorizingParamInterceptor implements MethodInterceptor {
           AddInstancesConfig._Fields.KEY,
           JobKey.class);
 
+  private static final FieldGetter<InstanceKey, JobKey> INSTANCE_KEY_GETTER =
+      new ThriftFieldGetter<>(InstanceKey.class, InstanceKey._Fields.JOB_KEY, JobKey.class);
+
   @SuppressWarnings("unchecked")
   private static final Set<FieldGetter<?, JobKey>> FIELD_GETTERS =
       ImmutableSet.of(
@@ -150,13 +154,12 @@ class ShiroAuthorizingParamInterceptor implements MethodInterceptor
{
           JOB_UPDATE_KEY_GETTER,
           ADD_INSTANCES_CONFIG_GETTER,
           QUERY_TO_JOB_KEY,
+          INSTANCE_KEY_GETTER,
           new IdentityFieldGetter<>(JobKey.class));
 
   private static final Map<Class<?>, Function<?, Optional<JobKey>>>
FIELD_GETTERS_BY_TYPE =
       ImmutableMap.<Class<?>, Function<?, Optional<JobKey>>>builder()
-          .putAll(Maps.uniqueIndex(
-              FIELD_GETTERS,
-              (Function<FieldGetter<?, JobKey>, Class<?>>) FieldGetter::getStructClass))
+          .putAll(Maps.uniqueIndex(FIELD_GETTERS, FieldGetter::getStructClass))
           .build();
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/aurora/blob/0d316cfb/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
index 59c9786..6da6c69 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
@@ -99,7 +99,7 @@ public class LockManagerImpl implements LockManager {
     if (!stored.equals(heldLock)) {
       if (stored.isPresent()) {
         throw new LockException(String.format(
-            "Unable to perform operation for: %s. Use override/cancel option.",
+            "Unable to perform operation for %s due to active lock held",
             formatLockKey(context)));
       } else if (heldLock.isPresent()) {
         throw new LockException(

http://git-wip-us.apache.org/repos/asf/aurora/blob/0d316cfb/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index c53c49e..6767024 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -13,6 +13,7 @@
  */
 package org.apache.aurora.scheduler.thrift;
 
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -23,6 +24,8 @@ import javax.inject.Inject;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ContiguousSet;
+import com.google.common.collect.DiscreteDomain;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableSet;
@@ -39,6 +42,7 @@ import org.apache.aurora.gen.ConfigRewrite;
 import org.apache.aurora.gen.DrainHostsResult;
 import org.apache.aurora.gen.EndMaintenanceResult;
 import org.apache.aurora.gen.Hosts;
+import org.apache.aurora.gen.InstanceKey;
 import org.apache.aurora.gen.InstanceTaskConfig;
 import org.apache.aurora.gen.JobConfiguration;
 import org.apache.aurora.gen.JobKey;
@@ -748,18 +752,16 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
   }
 
   @Override
-  public Response addInstances(AddInstancesConfig config, @Nullable Lock mutableLock) {
-    requireNonNull(config);
-    checkNotBlank(config.getInstanceIds());
-    IJobKey jobKey = JobKeys.assertValid(IJobKey.build(config.getKey()));
+  public Response addInstances(
+      @Nullable AddInstancesConfig config,
+      @Nullable Lock mutableLock,
+      @Nullable InstanceKey key,
+      int count) {
 
-    ITaskConfig task;
-    try {
-      task = configurationManager.validateAndPopulate(ITaskConfig.build(config.getTaskConfig()));
-    } catch (TaskDescriptionException e) {
-      return error(INVALID_REQUEST, e);
-    }
+    IJobKey jobKey =
+        JobKeys.assertValid(IJobKey.build(config == null ? key.getJobKey() : config.getKey()));
 
+    Response response = empty();
     return storage.write(storeProvider -> {
       try {
         if (getCronJob(storeProvider, jobKey).isPresent()) {
@@ -770,23 +772,53 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
             ILockKey.build(LockKey.job(jobKey.newBuilder())),
             java.util.Optional.ofNullable(mutableLock).map(ILock::build));
 
-        Iterable<IScheduledTask> currentTasks = storeProvider.getTaskStore().fetchTasks(
-            Query.jobScoped(task.getJob()).active());
+        FluentIterable<IScheduledTask> currentTasks = FluentIterable.from(
+            storeProvider.getTaskStore().fetchTasks(Query.jobScoped(jobKey).active()));
+
+        ITaskConfig task;
+        Set<Integer> instanceIds;
+        if (config == null) {
+          if (count <= 0) {
+            return invalidRequest(INVALID_INSTANCE_COUNT);
+          }
+
+          Optional<IScheduledTask> templateTask = Iterables.tryFind(
+              currentTasks,
+              e -> e.getAssignedTask().getInstanceId() == key.getInstanceId());
+          if (!templateTask.isPresent()) {
+            return invalidRequest(INVALID_INSTANCE_ID);
+          }
+
+          task = templateTask.get().getAssignedTask().getTask();
+          int lastId = currentTasks
+              .transform(e -> e.getAssignedTask().getInstanceId())
+              .toList()
+              .stream()
+              .max(Comparator.naturalOrder()).get();
+
+          instanceIds = ContiguousSet.create(
+              Range.openClosed(lastId, lastId + count),
+              DiscreteDomain.integers());
+        } else {
+          checkNotBlank(config.getInstanceIds());
+          addMessage(response, "The AddInstancesConfig field is deprecated.");
+
+          task = configurationManager.validateAndPopulate(
+              ITaskConfig.build(config.getTaskConfig()));
+          instanceIds = ImmutableSet.copyOf(config.getInstanceIds());
+        }
 
         validateTaskLimits(
             task,
-            Iterables.size(currentTasks) + config.getInstanceIdsSize(),
-            quotaManager.checkInstanceAddition(task, config.getInstanceIdsSize(), storeProvider));
+            Iterables.size(currentTasks) + instanceIds.size(),
+            quotaManager.checkInstanceAddition(task, instanceIds.size(), storeProvider));
 
-        stateManager.insertPendingTasks(
-            storeProvider,
-            task,
-            ImmutableSet.copyOf(config.getInstanceIds()));
+        stateManager.insertPendingTasks(storeProvider, task, instanceIds);
 
-        return ok();
+        return response.setResponseCode(OK);
       } catch (LockException e) {
         return error(LOCK_ERROR, e);
-      } catch (TaskValidationException | IllegalArgumentException e) {
+      } catch (TaskDescriptionException | TaskValidationException | IllegalArgumentException
e) {
         return error(INVALID_REQUEST, e);
       }
     });
@@ -1121,4 +1153,10 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
 
   @VisibleForTesting
   static final String INVALID_PULSE_TIMEOUT = "blockIfNoPulsesAfterMs must be positive.";
+
+  @VisibleForTesting
+  static final String INVALID_INSTANCE_ID = "No active task found for a given instance ID.";
+
+  @VisibleForTesting
+  static final String INVALID_INSTANCE_COUNT = "Instance count must be positive.";
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/0d316cfb/src/main/java/org/apache/aurora/scheduler/thrift/aop/AnnotatedAuroraAdmin.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/aop/AnnotatedAuroraAdmin.java
b/src/main/java/org/apache/aurora/scheduler/thrift/aop/AnnotatedAuroraAdmin.java
index c374343..f2f69f9 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/aop/AnnotatedAuroraAdmin.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/aop/AnnotatedAuroraAdmin.java
@@ -19,6 +19,7 @@ import javax.annotation.Nullable;
 
 import org.apache.aurora.gen.AddInstancesConfig;
 import org.apache.aurora.gen.AuroraAdmin;
+import org.apache.aurora.gen.InstanceKey;
 import org.apache.aurora.gen.JobConfiguration;
 import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.JobUpdateKey;
@@ -78,7 +79,9 @@ public interface AnnotatedAuroraAdmin extends AuroraAdmin.Iface {
   @Override
   Response addInstances(
       @AuthorizingParam @Nullable AddInstancesConfig config,
-      @Nullable Lock lock) throws TException;
+      @Nullable Lock lock,
+      @AuthorizingParam @Nullable InstanceKey key,
+      int count) throws TException;
 
   @Override
   Response acquireLock(

http://git-wip-us.apache.org/repos/asf/aurora/blob/0d316cfb/src/test/java/org/apache/aurora/scheduler/thrift/Fixtures.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/Fixtures.java b/src/test/java/org/apache/aurora/scheduler/thrift/Fixtures.java
index 72d2182..e456056 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/Fixtures.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/Fixtures.java
@@ -29,6 +29,7 @@ import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.Container;
 import org.apache.aurora.gen.ExecutorConfig;
 import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.InstanceKey;
 import org.apache.aurora.gen.JobConfiguration;
 import org.apache.aurora.gen.JobSummary;
 import org.apache.aurora.gen.JobSummaryResult;
@@ -80,6 +81,8 @@ final class Fixtures {
       IResourceAggregate.build(new ResourceAggregate(10.0, 1024, 2048));
   static final QuotaCheckResult ENOUGH_QUOTA = new QuotaCheckResult(SUFFICIENT_QUOTA);
   static final QuotaCheckResult NOT_ENOUGH_QUOTA = new QuotaCheckResult(INSUFFICIENT_QUOTA);
+  static final InstanceKey INSTANCE_KEY = new InstanceKey(JOB_KEY.newBuilder(), 0);
+  static final TaskConfig INVALID_TASK_CONFIG = defaultTask(true).setTier(",");
 
   private Fixtures() {
     // Utility class.

http://git-wip-us.apache.org/repos/asf/aurora/blob/0d316cfb/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index 84356d6..4ad9211 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -53,6 +53,7 @@ import org.apache.aurora.gen.JobUpdateSettings;
 import org.apache.aurora.gen.JobUpdateSummary;
 import org.apache.aurora.gen.LimitConstraint;
 import org.apache.aurora.gen.ListBackupsResult;
+import org.apache.aurora.gen.Lock;
 import org.apache.aurora.gen.LockKey;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.MesosContainer;
@@ -125,6 +126,8 @@ import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DED
 import static org.apache.aurora.scheduler.storage.backup.Recovery.RecoveryException;
 import static org.apache.aurora.scheduler.thrift.Fixtures.CRON_JOB;
 import static org.apache.aurora.scheduler.thrift.Fixtures.ENOUGH_QUOTA;
+import static org.apache.aurora.scheduler.thrift.Fixtures.INSTANCE_KEY;
+import static org.apache.aurora.scheduler.thrift.Fixtures.INVALID_TASK_CONFIG;
 import static org.apache.aurora.scheduler.thrift.Fixtures.JOB_KEY;
 import static org.apache.aurora.scheduler.thrift.Fixtures.JOB_NAME;
 import static org.apache.aurora.scheduler.thrift.Fixtures.LOCK;
@@ -309,7 +312,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
 
   @Test
   public void testCreateJobFailsConfigCheck() throws Exception {
-    IJobConfiguration job = IJobConfiguration.build(makeJob(null));
+    IJobConfiguration job = IJobConfiguration.build(makeJob(INVALID_TASK_CONFIG));
     control.replay();
 
     assertResponse(
@@ -584,6 +587,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     return IScheduledTask.build(new ScheduledTask()
         .setAssignedTask(new AssignedTask()
             .setTaskId(taskId)
+            .setInstanceId(0)
             .setTask(new TaskConfig()
                 .setJob(JOB_KEY.newBuilder().setName(jobName))
                 .setOwner(ROLE_IDENTITY)
@@ -971,7 +975,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   @Test
   public void testScheduleCronJobFailedTaskConfigValidation() throws Exception {
     control.replay();
-    IJobConfiguration job = IJobConfiguration.build(makeJob(null));
+    IJobConfiguration job = IJobConfiguration.build(makeJob(INVALID_TASK_CONFIG));
     assertResponse(
         INVALID_REQUEST,
         thrift.scheduleCronJob(job.newBuilder(), null));
@@ -1337,7 +1341,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     // Validate key is populated during sanitizing.
     AddInstancesConfig config = createInstanceConfig(populatedTask.newBuilder());
     config.getTaskConfig().unsetJob();
-    assertOkResponse(thrift.addInstances(config, LOCK.newBuilder()));
+    assertOkResponse(deprecatedAddInstances(config, LOCK.newBuilder()));
   }
 
   @Test
@@ -1357,104 +1361,162 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
 
     control.replay();
 
-    assertOkResponse(thrift.addInstances(config, null));
+    Response response = deprecatedAddInstances(config, null);
+    assertOkResponse(response);
+    assertMessageMatches(response, "The AddInstancesConfig field is deprecated.");
+  }
+
+  @Test
+  public void testAddInstancesWithInstanceKey() throws Exception {
+    expectNoCronJob();
+    lockManager.validateIfLocked(LOCK_KEY, java.util.Optional.empty());
+    IScheduledTask activeTask = buildScheduledTask();
+    ITaskConfig task = activeTask.getAssignedTask().getTask();
+    storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active(), activeTask);
+    expect(taskIdGenerator.generate(task, 3)).andReturn(TASK_ID);
+    expect(quotaManager.checkInstanceAddition(
+        task,
+        2,
+        storageUtil.mutableStoreProvider)).andReturn(ENOUGH_QUOTA);
+    stateManager.insertPendingTasks(
+        storageUtil.mutableStoreProvider,
+        task,
+        ImmutableSet.of(1, 2));
+
+    control.replay();
+
+    assertOkResponse(newAddInstances(INSTANCE_KEY, 2));
+  }
+
+  @Test
+  public void testAddInstancesWithInstanceKeyFailsWithNoInstance() throws Exception {
+    expectNoCronJob();
+    lockManager.validateIfLocked(LOCK_KEY, java.util.Optional.empty());
+    storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active());
+
+    control.replay();
+
+    assertEquals(
+        invalidResponse(SchedulerThriftInterface.INVALID_INSTANCE_ID),
+        newAddInstances(INSTANCE_KEY, 2));
+  }
+
+  @Test
+  public void testAddInstancesWithInstanceKeyFailsInvalidCount() throws Exception {
+    expectNoCronJob();
+    lockManager.validateIfLocked(LOCK_KEY, java.util.Optional.empty());
+    storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active());
+
+    control.replay();
+
+    assertEquals(
+        invalidResponse(SchedulerThriftInterface.INVALID_INSTANCE_COUNT),
+        newAddInstances(INSTANCE_KEY, 0));
   }
 
   @Test
   public void testAddInstancesFailsConfigCheck() throws Exception {
-    AddInstancesConfig config = createInstanceConfig(defaultTask(true).setJobName(null));
+    AddInstancesConfig config = createInstanceConfig(INVALID_TASK_CONFIG);
+    expectNoCronJob();
+    storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active());
+    lockManager.validateIfLocked(LOCK_KEY, java.util.Optional.empty());
 
     control.replay();
 
-    assertResponse(INVALID_REQUEST, thrift.addInstances(config, LOCK.newBuilder()));
+    assertResponse(INVALID_REQUEST, deprecatedAddInstances(config, null));
   }
 
   @Test
   public void testAddInstancesFailsCronJob() throws Exception {
-    AddInstancesConfig config = createInstanceConfig(defaultTask(true));
     expectCronJob();
 
     control.replay();
 
-    assertResponse(INVALID_REQUEST, thrift.addInstances(config, LOCK.newBuilder()));
+    assertResponse(INVALID_REQUEST, newAddInstances(INSTANCE_KEY, 1));
   }
 
   @Test(expected = StorageException.class)
   public void testAddInstancesFailsWithNonTransient() throws Exception {
-    AddInstancesConfig config = createInstanceConfig(defaultTask(true));
     expect(storageUtil.jobStore.fetchJob(JOB_KEY)).andThrow(new StorageException("no retry"));
 
     control.replay();
 
-    thrift.addInstances(config, LOCK.newBuilder());
+    newAddInstances(INSTANCE_KEY, 1);
   }
 
   @Test
   public void testAddInstancesLockCheckFails() throws Exception {
-    AddInstancesConfig config = createInstanceConfig(defaultTask(true));
     expectNoCronJob();
-    lockManager.validateIfLocked(LOCK_KEY, java.util.Optional.of(LOCK));
+    lockManager.validateIfLocked(LOCK_KEY, java.util.Optional.empty());
     expectLastCall().andThrow(new LockException("Failed lock check."));
 
     control.replay();
 
-    assertResponse(LOCK_ERROR, thrift.addInstances(config, LOCK.newBuilder()));
+    assertResponse(LOCK_ERROR, newAddInstances(INSTANCE_KEY, 1));
   }
 
   @Test
   public void testAddInstancesFailsTaskIdLength() throws Exception {
-    ITaskConfig populatedTask = ITaskConfig.build(populatedTask());
-    AddInstancesConfig config = createInstanceConfig(populatedTask.newBuilder());
+    IScheduledTask activeTask = buildScheduledTask();
+    ITaskConfig task = activeTask.getAssignedTask().getTask();
     expectNoCronJob();
-    lockManager.validateIfLocked(LOCK_KEY, java.util.Optional.of(LOCK));
-    storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active());
+    lockManager.validateIfLocked(LOCK_KEY, java.util.Optional.empty());
+    storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active(), activeTask);
     expect(quotaManager.checkInstanceAddition(
         anyObject(ITaskConfig.class),
         anyInt(),
         eq(storageUtil.mutableStoreProvider))).andReturn(ENOUGH_QUOTA);
-    expect(taskIdGenerator.generate(populatedTask, 1))
+    expect(taskIdGenerator.generate(task, 2))
         .andReturn(Strings.repeat("a", MAX_TASK_ID_LENGTH + 1));
 
     control.replay();
 
-    assertResponse(INVALID_REQUEST, thrift.addInstances(config, LOCK.newBuilder()));
+    assertResponse(INVALID_REQUEST, newAddInstances(INSTANCE_KEY, 1));
   }
 
   @Test
   public void testAddInstancesFailsQuotaCheck() throws Exception {
-    ITaskConfig populatedTask = ITaskConfig.build(populatedTask());
-    AddInstancesConfig config = createInstanceConfig(populatedTask.newBuilder());
+    IScheduledTask activeTask = buildScheduledTask();
+    ITaskConfig task = activeTask.getAssignedTask().getTask();
     expectNoCronJob();
-    lockManager.validateIfLocked(LOCK_KEY, java.util.Optional.of(LOCK));
-    storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active());
-    expect(taskIdGenerator.generate(populatedTask, 1))
+    lockManager.validateIfLocked(LOCK_KEY, java.util.Optional.empty());
+    storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active(), activeTask);
+    expect(taskIdGenerator.generate(task, 2))
         .andReturn(TASK_ID);
-    expectInstanceQuotaCheck(populatedTask, NOT_ENOUGH_QUOTA);
+    expectInstanceQuotaCheck(task, NOT_ENOUGH_QUOTA);
 
     control.replay();
 
-    assertResponse(INVALID_REQUEST, thrift.addInstances(config, LOCK.newBuilder()));
+    assertResponse(INVALID_REQUEST, newAddInstances(INSTANCE_KEY, 1));
   }
 
   @Test
   public void testAddInstancesInstanceCollisionFailure() throws Exception {
-    ITaskConfig populatedTask = ITaskConfig.build(populatedTask());
-    AddInstancesConfig config = createInstanceConfig(populatedTask.newBuilder());
+    IScheduledTask activeTask = buildScheduledTask();
+    ITaskConfig task = activeTask.getAssignedTask().getTask();
     expectNoCronJob();
-    lockManager.validateIfLocked(LOCK_KEY, java.util.Optional.of(LOCK));
-    storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active());
-    expect(taskIdGenerator.generate(populatedTask, 1))
+    lockManager.validateIfLocked(LOCK_KEY, java.util.Optional.empty());
+    storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active(), activeTask);
+    expect(taskIdGenerator.generate(task, 2))
         .andReturn(TASK_ID);
-    expectInstanceQuotaCheck(populatedTask, ENOUGH_QUOTA);
+    expectInstanceQuotaCheck(task, ENOUGH_QUOTA);
     stateManager.insertPendingTasks(
         storageUtil.mutableStoreProvider,
-        populatedTask,
-        ImmutableSet.of(0));
+        task,
+        ImmutableSet.of(1));
     expectLastCall().andThrow(new IllegalArgumentException("instance collision"));
 
     control.replay();
 
-    assertResponse(INVALID_REQUEST, thrift.addInstances(config, LOCK.newBuilder()));
+    assertResponse(INVALID_REQUEST, newAddInstances(INSTANCE_KEY, 1));
+  }
+
+  private Response newAddInstances(InstanceKey key, int count) throws Exception {
+    return thrift.addInstances(null, null, key, count);
+  }
+
+  private Response deprecatedAddInstances(AddInstancesConfig config, Lock lock) throws Exception
{
+    return thrift.addInstances(config, lock, null, 0);
   }
 
   @Test
@@ -1725,8 +1787,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
 
   @Test
   public void testStartUpdateFailsConfigValidation() throws Exception {
-    JobUpdateRequest request =
-        buildJobUpdateRequest(populatedTask().setIsService(true).setNumCpus(-1));
+    JobUpdateRequest request = buildJobUpdateRequest(INVALID_TASK_CONFIG.setIsService(true));
 
     control.replay();
     assertResponse(INVALID_REQUEST, thrift.startJobUpdate(request, AUDIT_MESSAGE));


Mime
View raw message