aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject git commit: Implementing startJobUpdate thrift API.
Date Thu, 14 Aug 2014 22:41:18 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master ce808e309 -> 040e71ef9


Implementing startJobUpdate thrift API.

Bugs closed: AURORA-649

Reviewed at https://reviews.apache.org/r/24655/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/040e71ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/040e71ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/040e71ef

Branch: refs/heads/master
Commit: 040e71ef9862dbe1711d8e2d84e905c6ed88fd4e
Parents: ce808e3
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Thu Aug 14 15:40:47 2014 -0700
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Thu Aug 14 15:40:47 2014 -0700

----------------------------------------------------------------------
 .../aurora/scheduler/state/JobUpdater.java      |  46 +++++
 .../aurora/scheduler/state/JobUpdaterImpl.java  | 163 ++++++++++++++++
 .../aurora/scheduler/state/StateModule.java     |   3 +
 .../thrift/SchedulerThriftInterface.java        |  67 ++++++-
 .../scheduler/state/JobUpdaterImplTest.java     | 185 +++++++++++++++++++
 .../thrift/SchedulerThriftInterfaceTest.java    | 108 +++++++++--
 .../aurora/scheduler/thrift/ThriftIT.java       |   2 +
 7 files changed, 556 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/040e71ef/src/main/java/org/apache/aurora/scheduler/state/JobUpdater.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/JobUpdater.java b/src/main/java/org/apache/aurora/scheduler/state/JobUpdater.java
new file mode 100644
index 0000000..f153444
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/JobUpdater.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.state;
+
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateRequest;
+
+/**
+ * Coordinates job updates and exposes control primitives (start, pause, resume, abort).
+ */
+public interface JobUpdater {
+
+  /**
+   * Saves the job update request and starts the update process.
+   *
+   * @param request Job update request.
+   * @param user User who initiated the update.
+   * @return Saved job update ID.
+   * @throws UpdaterException Throws if update fails to start for any reason.
+   */
+  String startJobUpdate(IJobUpdateRequest request, String user) throws UpdaterException;
+
+  /**
+   * Thrown when job update related operation failed.
+   */
+  class UpdaterException extends Exception {
+
+    public UpdaterException(String message) {
+      super(message);
+    }
+
+    public UpdaterException(String message, Throwable e) {
+      super(message, e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/040e71ef/src/main/java/org/apache/aurora/scheduler/state/JobUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/JobUpdaterImpl.java b/src/main/java/org/apache/aurora/scheduler/state/JobUpdaterImpl.java
new file mode 100644
index 0000000..6bcdf62
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/JobUpdaterImpl.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.state;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.collect.ContiguousSet;
+import com.google.common.collect.DiscreteDomain;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeSet;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.gen.InstanceTaskConfig;
+import org.apache.aurora.gen.JobUpdate;
+import org.apache.aurora.gen.JobUpdateConfiguration;
+import org.apache.aurora.gen.JobUpdateEvent;
+import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.JobUpdateSummary;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.Storage.StorageException;
+import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateRequest;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Implementation of the {@link JobUpdater}.
+ */
+class JobUpdaterImpl implements JobUpdater {
+
+  private final Storage storage;
+  private final UUIDGenerator uuidGenerator;
+  private final Clock clock;
+
+  private static final Function<Integer, Range<Integer>> INSTANCE_ID_TO_RANGE
=
+      new Function<Integer, Range<Integer>>() {
+        @Override
+        public Range<Integer> apply(Integer id) {
+          return Range.closed(id, id).canonical(DiscreteDomain.integers());
+        }
+      };
+
+  private static final Function<Collection<Range<Integer>>, Set<org.apache.aurora.gen.Range>>
+      REDUCE_RANGES = new Function<Collection<Range<Integer>>, Set<org.apache.aurora.gen.Range>>()
{
+        @Override
+        public Set<org.apache.aurora.gen.Range> apply(Collection<Range<Integer>>
input) {
+          RangeSet<Integer> rangeSet = TreeRangeSet.create();
+          for (Range<Integer> range : input) {
+            rangeSet.add(range);
+          }
+
+          ImmutableSet.Builder<org.apache.aurora.gen.Range> builder = ImmutableSet.builder();
+          for (Range<Integer> range : rangeSet.asRanges()) {
+            // Canonical range of integers is closedOpen, which makes extracting upper bound
+            // a problem without resorting to subtraction. The workaround is to convert range
+            // into Contiguous set and get first/last.
+            ContiguousSet<Integer> set = ContiguousSet.create(range, DiscreteDomain.integers());
+            builder.add(new org.apache.aurora.gen.Range(
+                set.first(),
+                set.last()));
+          }
+
+          return builder.build();
+        }
+      };
+
+  @Inject
+  JobUpdaterImpl(Storage storage, Clock clock, UUIDGenerator uuidGenerator) {
+    this.storage = requireNonNull(storage);
+    this.clock = requireNonNull(clock);
+    this.uuidGenerator = requireNonNull(uuidGenerator);
+  }
+
+  @Override
+  public String startJobUpdate(final IJobUpdateRequest request, final String user)
+      throws UpdaterException {
+
+    return storage.write(new MutateWork<String, UpdaterException>() {
+      @Override
+      public String apply(Storage.MutableStoreProvider storeProvider) throws UpdaterException
{
+        String updateId = uuidGenerator.createNew().toString();
+
+        IJobUpdate update = IJobUpdate.build(new JobUpdate()
+            .setSummary(new JobUpdateSummary()
+                .setJobKey(request.getJobKey().newBuilder())
+                .setUpdateId(updateId)
+                .setUser(user))
+            .setConfiguration(new JobUpdateConfiguration()
+                .setSettings(request.getSettings().newBuilder())
+                .setInstanceCount(request.getInstanceCount())
+                .setNewTaskConfig(request.getTaskConfig().newBuilder())
+                .setOldTaskConfigs(buildOldTaskConfigs(request.getJobKey(), storeProvider))));
+
+        IJobUpdateEvent event = IJobUpdateEvent.build(new JobUpdateEvent()
+            .setStatus(JobUpdateStatus.ROLLING_FORWARD)
+            .setTimestampMs(clock.nowMillis()));
+
+        try {
+          storeProvider.getJobUpdateStore().saveJobUpdate(update);
+          storeProvider.getJobUpdateStore().saveJobUpdateEvent(event, updateId);
+        } catch (StorageException e) {
+          throw new UpdaterException("Failed to start update.", e);
+        }
+
+        // TODO(maxim): wire in updater logic when ready.
+
+        return updateId;
+      }
+    });
+  }
+
+  private Set<InstanceTaskConfig> buildOldTaskConfigs(
+      IJobKey jobKey,
+      StoreProvider storeProvider) {
+
+    Set<IScheduledTask> tasks =
+        storeProvider.getTaskStore().fetchTasks(Query.jobScoped(jobKey).active());
+
+    Map<ITaskConfig, Set<org.apache.aurora.gen.Range>> rangesByTask = Maps.transformValues(
+        Multimaps.transformValues(
+            Multimaps.index(tasks, Tasks.SCHEDULED_TO_INFO),
+            Functions.compose(INSTANCE_ID_TO_RANGE, Tasks.SCHEDULED_TO_INSTANCE_ID)).asMap(),
+        REDUCE_RANGES);
+
+    ImmutableSet.Builder<InstanceTaskConfig> builder = ImmutableSet.builder();
+    for (Map.Entry<ITaskConfig, Set<org.apache.aurora.gen.Range>> entry : rangesByTask.entrySet())
{
+      builder.add(new InstanceTaskConfig()
+          .setTask(entry.getKey().newBuilder())
+          .setInstances(entry.getValue()));
+    }
+
+    return builder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/040e71ef/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
index 2c712ef..cc1eee4 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
@@ -47,6 +47,9 @@ public class StateModule extends AbstractModule {
     bind(LockManager.class).to(LockManagerImpl.class);
     bind(LockManagerImpl.class).in(Singleton.class);
 
+    bind(JobUpdater.class).to(JobUpdaterImpl.class);
+    bind(JobUpdaterImpl.class).in(Singleton.class);
+
     bindMaintenanceController(binder());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/040e71ef/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index 9e9f397..0802ee0 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -90,6 +90,7 @@ import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduleStatusResult;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.SessionKey;
+import org.apache.aurora.gen.StartJobUpdateResult;
 import org.apache.aurora.gen.StartMaintenanceResult;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskQuery;
@@ -111,6 +112,8 @@ import org.apache.aurora.scheduler.metadata.NearestFit;
 import org.apache.aurora.scheduler.quota.QuotaInfo;
 import org.apache.aurora.scheduler.quota.QuotaManager;
 import org.apache.aurora.scheduler.quota.QuotaManager.QuotaException;
+import org.apache.aurora.scheduler.state.JobUpdater;
+import org.apache.aurora.scheduler.state.JobUpdater.UpdaterException;
 import org.apache.aurora.scheduler.state.LockManager;
 import org.apache.aurora.scheduler.state.LockManager.LockException;
 import org.apache.aurora.scheduler.state.MaintenanceController;
@@ -127,6 +130,7 @@ import org.apache.aurora.scheduler.storage.backup.StorageBackup;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateRequest;
 import org.apache.aurora.scheduler.storage.entities.ILock;
 import org.apache.aurora.scheduler.storage.entities.ILockKey;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
@@ -184,6 +188,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
   private final QuotaManager quotaManager;
   private final NearestFit nearestFit;
   private final StateManager stateManager;
+  private final JobUpdater jobUpdater;
 
   @Inject
   SchedulerThriftInterface(
@@ -198,7 +203,8 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       MaintenanceController maintenance,
       QuotaManager quotaManager,
       NearestFit nearestFit,
-      StateManager stateManager) {
+      StateManager stateManager,
+      JobUpdater jobUpdater) {
 
     this(storage,
         schedulerCore,
@@ -211,7 +217,8 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
         cronPredictor,
         quotaManager,
         nearestFit,
-        stateManager);
+        stateManager,
+        jobUpdater);
   }
 
   @VisibleForTesting
@@ -227,7 +234,8 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       CronPredictor cronPredictor,
       QuotaManager quotaManager,
       NearestFit nearestFit,
-      StateManager stateManager) {
+      StateManager stateManager,
+      JobUpdater jobUpdater) {
 
     this.storage = requireNonNull(storage);
     this.schedulerCore = requireNonNull(schedulerCore);
@@ -241,6 +249,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     this.quotaManager = requireNonNull(quotaManager);
     this.nearestFit = requireNonNull(nearestFit);
     this.stateManager = requireNonNull(stateManager);
+    this.jobUpdater = requireNonNull(jobUpdater);
   }
 
   @Override
@@ -1247,8 +1256,56 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
   }
 
   @Override
-  public Response startJobUpdate(JobUpdateRequest request, Lock lock, SessionKey session)
{
-    throw new UnsupportedOperationException("Not implemented");
+  public Response startJobUpdate(
+      JobUpdateRequest mutableRequest,
+      Lock mutableLock,
+      SessionKey session) {
+
+    // TODO(maxim): validate JobUpdateRequest fields.
+    requireNonNull(mutableRequest);
+    requireNonNull(session);
+
+    final ILock lock = ILock.build(requireNonNull(mutableLock));
+    final Response response = emptyResponse();
+
+    final SessionContext context;
+    final IJobUpdateRequest request;
+    try {
+      context = sessionValidator.checkAuthenticated(
+          session,
+          ImmutableSet.of(mutableRequest.getJobKey().getRole()));
+
+      request = IJobUpdateRequest.build(new JobUpdateRequest(mutableRequest).setTaskConfig(
+          ConfigurationManager.validateAndPopulate(
+              ITaskConfig.build(mutableRequest.getTaskConfig())).newBuilder()));
+
+    } catch (AuthFailedException e) {
+      return addMessage(response, AUTH_FAILED, e);
+    } catch (TaskDescriptionException e) {
+      return addMessage(response, INVALID_REQUEST, e);
+    }
+
+    return storage.write(new MutateWork.Quiet<Response>() {
+      @Override
+      public Response apply(MutableStoreProvider storeProvider) {
+        try {
+          lockManager.validateIfLocked(
+              ILockKey.build(LockKey.job(request.getJobKey().newBuilder())),
+              Optional.of(lock));
+        } catch (LockException e) {
+          return addMessage(response, LOCK_ERROR, e);
+        }
+
+        // TODO(maxim): Wire in task limits and quota checks from SchedulerCore.
+
+        try {
+          String updateId = jobUpdater.startJobUpdate(request, context.getIdentity());
+          return okResponse(Result.startJobUpdateResult(new StartJobUpdateResult(updateId)));
+        } catch (UpdaterException e) {
+          return addMessage(response, INVALID_REQUEST, e);
+        }
+      }
+    });
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/040e71ef/src/test/java/org/apache/aurora/scheduler/state/JobUpdaterImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/JobUpdaterImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/JobUpdaterImplTest.java
new file mode 100644
index 0000000..1f985fb
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/state/JobUpdaterImplTest.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.state;
+
+import java.util.Map;
+import java.util.UUID;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.InstanceTaskConfig;
+import org.apache.aurora.gen.JobUpdate;
+import org.apache.aurora.gen.JobUpdateConfiguration;
+import org.apache.aurora.gen.JobUpdateEvent;
+import org.apache.aurora.gen.JobUpdateRequest;
+import org.apache.aurora.gen.JobUpdateSettings;
+import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.JobUpdateSummary;
+import org.apache.aurora.gen.Range;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.state.JobUpdater.UpdaterException;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateRequest;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+
+public class JobUpdaterImplTest extends EasyMockTest {
+
+  private static final String UPDATE_ID = "82d6d790-3212-11e3-aa6e-0800200c9a74";
+  private static final UUID ID = UUID.fromString(UPDATE_ID);
+  private static final IJobKey JOB = JobKeys.from("role", "env", "name");
+  private static final Identity IDENTITY = new Identity("role", "user");
+  private static final Long TIMESTAMP = 1234L;
+
+  private JobUpdater updater;
+  private StorageTestUtil storageUtil;
+  private Clock clock;
+  private UUIDGenerator idGenerator;
+
+  @Before
+  public void setUp() throws Exception {
+    storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
+
+    clock = createMock(Clock.class);
+    idGenerator = createMock(UUIDGenerator.class);
+
+    updater = new JobUpdaterImpl(storageUtil.storage, clock, idGenerator);
+  }
+
+  @Test
+  public void testSaveUpdate() throws Exception {
+    IScheduledTask oldTask1 = buildScheduledTask(0, 5);
+    IScheduledTask oldTask2 = buildScheduledTask(1, 5);
+    IScheduledTask oldTask3 = buildScheduledTask(2, 7);
+    IScheduledTask oldTask4 = buildScheduledTask(3, 7);
+    IScheduledTask oldTask5 = buildScheduledTask(4, 5);
+    IScheduledTask oldTask6 = buildScheduledTask(5, 5);
+    IScheduledTask oldTask7 = buildScheduledTask(6, 5);
+
+    ITaskConfig newTask = buildScheduledTask(0, 8).getAssignedTask().getTask();
+
+    IJobUpdate update = buildJobUpdate(6, newTask, ImmutableMap.of(
+        oldTask1.getAssignedTask().getTask(), ImmutableSet.of(new Range(0, 1), new Range(4,
6)),
+        oldTask3.getAssignedTask().getTask(), ImmutableSet.of(new Range(2, 3))
+    ));
+
+    expect(idGenerator.createNew()).andReturn(ID);
+    expect(clock.nowMillis()).andReturn(TIMESTAMP);
+
+    storageUtil.expectTaskFetch(
+        Query.unscoped().byJob(JOB).active(),
+        oldTask1,
+        oldTask2,
+        oldTask3,
+        oldTask4,
+        oldTask5,
+        oldTask6,
+        oldTask7);
+
+    storageUtil.updateStore.saveJobUpdate(update);
+    storageUtil.updateStore.saveJobUpdateEvent(buildUpdateEvent(), UPDATE_ID);
+
+    control.replay();
+
+    assertEquals(UPDATE_ID, updater.startJobUpdate(buildJobRequest(update), IDENTITY.getUser()));
+  }
+
+  @Test(expected = UpdaterException.class)
+  public void testSaveUpdateFails() throws Exception {
+    ITaskConfig newTask = buildScheduledTask(0, 8).getAssignedTask().getTask();
+
+    IJobUpdate update = buildJobUpdate(
+        6,
+        newTask,
+        ImmutableMap.<ITaskConfig, ImmutableSet<Range>>of());
+
+    expect(idGenerator.createNew()).andReturn(ID);
+    expect(clock.nowMillis()).andReturn(TIMESTAMP);
+
+    storageUtil.expectTaskFetch(Query.unscoped().byJob(JOB).active());
+
+    storageUtil.updateStore.saveJobUpdate(update);
+    expectLastCall().andThrow(new Storage.StorageException("fail"));
+
+    control.replay();
+
+    updater.startJobUpdate(buildJobRequest(update), IDENTITY.getUser());
+  }
+
+  private static IJobUpdateRequest buildJobRequest(IJobUpdate update) {
+    return IJobUpdateRequest.build(new JobUpdateRequest()
+        .setInstanceCount(update.getConfiguration().getInstanceCount())
+        .setJobKey(update.getSummary().getJobKey().newBuilder())
+        .setSettings(update.getConfiguration().getSettings().newBuilder())
+        .setTaskConfig(update.getConfiguration().getNewTaskConfig().newBuilder()));
+  }
+
+  private static IScheduledTask buildScheduledTask(int instanceId, long ramMb) {
+    return IScheduledTask.build(new ScheduledTask()
+        .setAssignedTask(new AssignedTask()
+            .setInstanceId(instanceId)
+            .setTask(new TaskConfig()
+                .setRamMb(ramMb) // Simulates unique task config.
+                .setOwner(IDENTITY)
+                .setEnvironment(JOB.getEnvironment())
+                .setJobName(JOB.getName()))));
+  }
+
+  private static IJobUpdate buildJobUpdate(
+      int instanceCount,
+      ITaskConfig newConfig,
+      ImmutableMap<ITaskConfig, ImmutableSet<Range>> oldConfigMap) {
+
+    ImmutableSet.Builder<InstanceTaskConfig> builder = ImmutableSet.builder();
+    for (Map.Entry<ITaskConfig, ImmutableSet<Range>> entry : oldConfigMap.entrySet())
{
+      builder.add(new InstanceTaskConfig(entry.getKey().newBuilder(), entry.getValue()));
+    }
+
+    return IJobUpdate.build(new JobUpdate()
+        .setSummary(new JobUpdateSummary()
+            .setJobKey(JOB.newBuilder())
+            .setUpdateId(UPDATE_ID)
+            .setUser(IDENTITY.getUser()))
+        .setConfiguration(new JobUpdateConfiguration()
+            .setSettings(new JobUpdateSettings())
+            .setInstanceCount(instanceCount)
+            .setNewTaskConfig(newConfig.newBuilder())
+            .setOldTaskConfigs(builder.build())));
+  }
+
+  private static IJobUpdateEvent buildUpdateEvent() {
+    return IJobUpdateEvent.build(new JobUpdateEvent()
+        .setStatus(JobUpdateStatus.ROLLING_FORWARD)
+        .setTimestampMs(TIMESTAMP));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/040e71ef/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index d34bd6f..7dbf97a 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -64,6 +64,8 @@ import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.JobStats;
 import org.apache.aurora.gen.JobSummary;
 import org.apache.aurora.gen.JobSummaryResult;
+import org.apache.aurora.gen.JobUpdateRequest;
+import org.apache.aurora.gen.JobUpdateSettings;
 import org.apache.aurora.gen.LimitConstraint;
 import org.apache.aurora.gen.Lock;
 import org.apache.aurora.gen.LockKey;
@@ -99,6 +101,8 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.metadata.NearestFit;
 import org.apache.aurora.scheduler.quota.QuotaInfo;
 import org.apache.aurora.scheduler.quota.QuotaManager;
+import org.apache.aurora.scheduler.state.JobUpdater;
+import org.apache.aurora.scheduler.state.JobUpdater.UpdaterException;
 import org.apache.aurora.scheduler.state.LockManager;
 import org.apache.aurora.scheduler.state.LockManager.LockException;
 import org.apache.aurora.scheduler.state.MaintenanceController;
@@ -110,6 +114,7 @@ import org.apache.aurora.scheduler.storage.backup.Recovery;
 import org.apache.aurora.scheduler.storage.backup.StorageBackup;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateRequest;
 import org.apache.aurora.scheduler.storage.entities.ILock;
 import org.apache.aurora.scheduler.storage.entities.ILockKey;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
@@ -163,6 +168,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   private static final JobConfiguration CRON_JOB = makeJob().setCronSchedule("* * * * *");
   private static final Lock DEFAULT_LOCK = null;
   private static final String TASK_ID = "task_id";
+  private static final String UPDATE_ID = "82d6d790-3212-11e3-aa6e-0800200c9a74";
 
   private static final IResourceAggregate QUOTA =
       IResourceAggregate.build(new ResourceAggregate(10.0, 1024, 2048));
@@ -191,6 +197,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   private QuotaManager quotaManager;
   private NearestFit nearestFit;
   private StateManager stateManager;
+  private JobUpdater jobUpdater;
 
   @Before
   public void setUp() throws Exception {
@@ -209,6 +216,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     quotaManager = createMock(QuotaManager.class);
     nearestFit = createMock(NearestFit.class);
     stateManager = createMock(StateManager.class);
+    jobUpdater = createMock(JobUpdater.class);
 
     // Use guice and install AuthModule to apply AOP-style auth layer.
     Module testModule = new AbstractModule() {
@@ -229,6 +237,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
         bind(CronPredictor.class).toInstance(cronPredictor);
         bind(NearestFit.class).toInstance(nearestFit);
         bind(StateManager.class).toInstance(stateManager);
+        bind(JobUpdater.class).toInstance(jobUpdater);
       }
     };
     Injector injector = Guice.createInjector(testModule, new AopModule());
@@ -1599,14 +1608,14 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
 
   @Test
   public void testAddInstances() throws Exception {
-    AddInstancesConfig config = createInstanceConfig(defaultTask(true));
+    AddInstancesConfig config = createInstanceConfig(populatedTask());
     expectAuth(ROLE, true);
     expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false);
     lockManager.validateIfLocked(LOCK_KEY, Optional.of(LOCK));
     scheduler.addInstances(
-        eq(JOB_KEY),
-        eq(ImmutableSet.copyOf(config.getInstanceIds())),
-        anyObject(ITaskConfig.class));
+        JOB_KEY,
+        ImmutableSet.copyOf(config.getInstanceIds()),
+        ITaskConfig.build(config.getTaskConfig()));
 
     control.replay();
 
@@ -1615,14 +1624,14 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
 
   @Test
   public void testAddInstancesWithNullLock() throws Exception {
-    AddInstancesConfig config = createInstanceConfig(defaultTask(true));
+    AddInstancesConfig config = createInstanceConfig(populatedTask());
     expectAuth(ROLE, true);
     expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false);
     lockManager.validateIfLocked(LOCK_KEY, Optional.<ILock>absent());
     scheduler.addInstances(
-        eq(JOB_KEY),
-        eq(ImmutableSet.copyOf(config.getInstanceIds())),
-        anyObject(ITaskConfig.class));
+        JOB_KEY,
+        ImmutableSet.copyOf(config.getInstanceIds()),
+        ITaskConfig.build(config.getTaskConfig()));
     control.replay();
 
     assertOkResponse(thrift.addInstances(config, null, SESSION));
@@ -1630,14 +1639,14 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
 
   @Test
   public void testAddInstancesFails() throws Exception {
-    AddInstancesConfig config = createInstanceConfig(defaultTask(true));
+    AddInstancesConfig config = createInstanceConfig(populatedTask());
     expectAuth(ROLE, true);
     expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false);
     lockManager.validateIfLocked(LOCK_KEY, Optional.of(LOCK));
     scheduler.addInstances(
-        eq(JOB_KEY),
-        eq(ImmutableSet.copyOf(config.getInstanceIds())),
-        anyObject(ITaskConfig.class));
+        JOB_KEY,
+        ImmutableSet.copyOf(config.getInstanceIds()),
+        ITaskConfig.build(config.getTaskConfig()));
     expectLastCall().andThrow(new ScheduleException("Failed"));
 
     control.replay();
@@ -1798,6 +1807,71 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
         response.getResult().getGetQuotaResult().getNonProdConsumption());
   }
 
+  @Test
+  public void testStartUpdate() throws Exception {
+    JobUpdateRequest request = createJobRequest(populatedTask());
+    expectAuth(ROLE, true);
+    lockManager.validateIfLocked(LOCK_KEY, Optional.of(LOCK));
+    expect(jobUpdater.startJobUpdate(IJobUpdateRequest.build(request), USER)).andReturn(UPDATE_ID);
+
+    control.replay();
+
+    Response response =
+        assertOkResponse(thrift.startJobUpdate(request, LOCK.newBuilder(), SESSION));
+    assertEquals(UPDATE_ID, response.getResult().getStartJobUpdateResult().getUpdateId());
+  }
+
+  @Test
+  public void testStartUpdateFailsAuth() throws Exception {
+    JobUpdateRequest request = createJobRequest(populatedTask());
+    expectAuth(ROLE, false);
+
+    control.replay();
+    assertResponse(AUTH_FAILED, thrift.startJobUpdate(request, LOCK.newBuilder(), SESSION));
+  }
+
+  @Test
+  public void testStartUpdateFailsConfigValidation() throws Exception {
+    JobUpdateRequest request = createJobRequest(populatedTask().setJobName(null));
+    expectAuth(ROLE, true);
+
+    control.replay();
+    assertResponse(INVALID_REQUEST, thrift.startJobUpdate(request, LOCK.newBuilder(), SESSION));
+  }
+
+  @Test
+  public void testStartUpdateFailsLockValidation() throws Exception {
+    JobUpdateRequest request = createJobRequest(populatedTask());
+    expectAuth(ROLE, true);
+    lockManager.validateIfLocked(LOCK_KEY, Optional.of(LOCK));
+    expectLastCall().andThrow(new LockException("lock failed"));
+
+    control.replay();
+
+    assertResponse(LOCK_ERROR, thrift.startJobUpdate(request, LOCK.newBuilder(), SESSION));
+  }
+
+  @Test
+  public void testStartUpdateFailsInUpdater() throws Exception {
+    JobUpdateRequest request = createJobRequest(populatedTask());
+    expectAuth(ROLE, true);
+    lockManager.validateIfLocked(LOCK_KEY, Optional.of(LOCK));
+    expect(jobUpdater.startJobUpdate(IJobUpdateRequest.build(request), USER))
+        .andThrow(new UpdaterException("failed update"));
+
+    control.replay();
+
+    assertResponse(INVALID_REQUEST, thrift.startJobUpdate(request, LOCK.newBuilder(), SESSION));
+  }
+
+  private static JobUpdateRequest createJobRequest(TaskConfig config) {
+    return new JobUpdateRequest()
+        .setInstanceCount(5)
+        .setJobKey(JOB_KEY.newBuilder())
+        .setSettings(new JobUpdateSettings())
+        .setTaskConfig(config);
+  }
+
   private static JobConfiguration makeJob() {
     return makeJob(nonProductionTask(), 1);
   }
@@ -1872,7 +1946,15 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
         .setNumCpus(1)
         .setRamMb(1024)
         .setDiskMb(1024)
-        .setProduction(production);
+        .setProduction(production)
+        .setRequestedPorts(ImmutableSet.<String>of())
+        .setTaskLinks(ImmutableMap.<String, String>of())
+        .setMaxTaskFailures(1);
+  }
+
+  private static TaskConfig populatedTask() {
+    return defaultTask(true).setConstraints(ImmutableSet.of(
+        new Constraint("host", TaskConstraint.limit(new LimitConstraint(1)))));
   }
 
   private static TaskConfig productionTask() {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/040e71ef/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
index 2562ff9..43265fd 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
@@ -35,6 +35,7 @@ import org.apache.aurora.gen.SessionKey;
 import org.apache.aurora.scheduler.cron.CronJobManager;
 import org.apache.aurora.scheduler.cron.CronPredictor;
 import org.apache.aurora.scheduler.quota.QuotaManager;
+import org.apache.aurora.scheduler.state.JobUpdater;
 import org.apache.aurora.scheduler.state.LockManager;
 import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.SchedulerCore;
@@ -154,6 +155,7 @@ public class ThriftIT extends EasyMockTest {
             bindMock(LockManager.class);
             bindMock(ShutdownRegistry.class);
             bindMock(StateManager.class);
+            bindMock(JobUpdater.class);
             storageTestUtil = new StorageTestUtil(ThriftIT.this);
             bind(Storage.class).toInstance(storageTestUtil.storage);
             bind(NonVolatileStorage.class).toInstance(storageTestUtil.storage);


Mime
View raw message