aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zma...@apache.org
Subject aurora git commit: Add rollback functionality to the scheduler
Date Thu, 11 Aug 2016 20:57:33 GMT
Repository: aurora
Updated Branches:
  refs/heads/master 90846640b -> a4fdf284d


Add rollback functionality to the scheduler

For active job updates in ROLLING_FORWARD, ROLL_BACK_PAUSED,
ROLL_BACK_AWAITING_PULSE, ROLL_FORWARD_PAUSED or ROLL_FORWARD_AWAITING_PULSE
state it is possible now to initiate a rollback by calling a corresponding API
function.  Rollback is also supported in aurora CLI tool via new command: aurora
update rollback CLUSTER/ROLE/ENV/NAME

Bugs closed: AURORA-1721

Reviewed at https://reviews.apache.org/r/50168/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/a4fdf284
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/a4fdf284
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/a4fdf284

Branch: refs/heads/master
Commit: a4fdf284d878c9430f267e2051a1ae97355d0ddb
Parents: 9084664
Author: Igor Morozov <igmorv@gmail.com>
Authored: Thu Aug 11 13:56:48 2016 -0700
Committer: Zameer Manji <zmanji@apache.org>
Committed: Thu Aug 11 13:56:48 2016 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   3 +-
 RELEASE-NOTES.md                                |   2 +
 .../thrift/org/apache/aurora/gen/api.thrift     |   9 +
 .../thrift/SchedulerThriftInterface.java        |   8 +
 .../thrift/aop/AnnotatedAuroraAdmin.java        |   5 +
 .../scheduler/updater/JobUpdateController.java  |  20 ++
 .../updater/JobUpdateControllerImpl.java        |   7 +
 .../updater/JobUpdateStateMachine.java          |  14 +-
 .../python/apache/aurora/client/api/__init__.py |  11 +
 .../python/apache/aurora/client/cli/update.py   |  97 ++++++--
 .../updater/JobUpdateStateMachineTest.java      |   2 +
 .../aurora/scheduler/updater/JobUpdaterIT.java  | 220 ++++++++++++++++++-
 .../aurora/client/api/test_scheduler_client.py  |   5 +
 .../apache/aurora/client/cli/test_supdate.py    |  86 ++++++++
 14 files changed, 459 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index fc6a46d..b1dce58 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,11 +1,10 @@
 Aurora 0.15.0
 --------------------------------------------------------------------------------
 ## Task
-    * [AURORA-1725] - Expose tier configurations as a debug page 
+    * [AURORA-1725] - Expose tier configurations as a debug page
     * [AURORA-1458] - Add tier into the UI "show config" summary
     * [AURORA-1720] - Broken link in http://aurora.apache.org/
 
-
 Aurora 0.14.0
 --------------------------------------------------------------------------------
 ## Bug

http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 50f9b83..1819eaa 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -29,6 +29,8 @@
   documentation.
 - The `ExecutorInfo.source` field is deprecated and has been replaced with a label named
`source`.
   It will be removed from Mesos in a future release.
+- Add rollback API to the scheduler and new client command to support rolling back
+  active update jobs to their initial state.
 
 ### Deprecations and removals:
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/api/src/main/thrift/org/apache/aurora/gen/api.thrift
----------------------------------------------------------------------
diff --git a/api/src/main/thrift/org/apache/aurora/gen/api.thrift b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
index b799cce..c5765b7 100644
--- a/api/src/main/thrift/org/apache/aurora/gen/api.thrift
+++ b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
@@ -1106,6 +1106,15 @@ service AuroraSchedulerManager extends ReadOnlyScheduler {
       3: string message)
 
   /**
+   * Rollbacks the specified active job update to the initial state.
+   */
+  Response rollbackJobUpdate(
+      /** The update to rollback. */
+      1: JobUpdateKey key,
+      /** A user-specified message to include with the induced job update state change. */
+      2: string message)
+
+  /**
    * Allows progress of the job update in case blockIfNoPulsesAfterMs is specified in
    * JobUpdateSettings. Unblocks progress if the update was previously blocked.
    * Responds with ResponseCode.INVALID_REQUEST in case an unknown update key is specified.

http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index b534abf..929d021 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -979,6 +979,14 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
   }
 
   @Override
+  public Response rollbackJobUpdate(JobUpdateKey mutableKey, @Nullable String message) {
+    return changeJobUpdateState(
+        mutableKey,
+        JobUpdateController::rollback,
+        Optional.fromNullable(message));
+  }
+
+  @Override
   public Response pulseJobUpdate(JobUpdateKey mutableUpdateKey) {
     IJobUpdateKey updateKey = validateJobUpdateKey(mutableUpdateKey);
     try {

http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/main/java/org/apache/aurora/scheduler/thrift/aop/AnnotatedAuroraAdmin.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/aop/AnnotatedAuroraAdmin.java
b/src/main/java/org/apache/aurora/scheduler/thrift/aop/AnnotatedAuroraAdmin.java
index 9243c92..bfc3dc8 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/aop/AnnotatedAuroraAdmin.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/aop/AnnotatedAuroraAdmin.java
@@ -95,4 +95,9 @@ public interface AnnotatedAuroraAdmin extends AuroraAdmin.Iface {
   @Override
   Response pulseJobUpdate(
       @AuthorizingParam @Nullable JobUpdateKey key) throws TException;
+
+  @Override
+  Response rollbackJobUpdate(
+      @AuthorizingParam @Nullable JobUpdateKey key,
+      @Nullable String message) throws TException;
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
index f8357c4..c2ec1b3 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
@@ -123,6 +123,26 @@ public interface JobUpdateController {
   void abort(IJobUpdateKey key, AuditData auditData) throws UpdateStateException;
 
   /**
+   * Rollbacks an active job update.
+   * <p>
+   * This will rollback the update to its initial state effectively 'undoing' it.
+   * The rollback is possible if update is in following states:
+   * <ul>
+   *    <li>ROLLING_FORWARD</li>
+   *    <li>ROLL_BACK_PAUSED</li>
+   *    <li>ROLL_BACK_AWAITING_PULSE</li>
+   *    <li>ROLL_FORWARD_PAUSED</li>
+   *    <li>ROLL_FORWARD_AWAITING_PULSE</li>
+   * </ul>
+   * has not reached its terminal state yet.
+   *
+   * @param key Update to rollback.
+   * @param auditData Details about the origin of this state change.
+   * @throws UpdateStateException If pre-condition is not met.
+   */
+  void rollback(IJobUpdateKey key, AuditData auditData) throws UpdateStateException;
+
+  /**
    * Notifies the updater that the state of an instance has changed. A state change could
also mean
    * deletion.
    *

http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index 364c5c7..594bb62 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -246,6 +246,13 @@ class JobUpdateControllerImpl implements JobUpdateController {
         Functions.compose(createAuditedEvent(auditData), Functions.constant(ABORTED)));
   }
 
+  @Override
+  public void rollback(IJobUpdateKey key, AuditData auditData) throws UpdateStateException
{
+    unscopedChangeUpdateStatus(
+        key,
+        Functions.compose(createAuditedEvent(auditData), Functions.constant(ROLLING_BACK)));
+  }
+
   private static Function<JobUpdateStatus, JobUpdateEvent> createAuditedEvent(
       final AuditData auditData) {
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java
b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java
index 7ab739a..959a5a5 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java
@@ -70,9 +70,19 @@ final class JobUpdateStateMachine {
               ABORTED,
               ERROR,
               FAILED)
-          .putAll(ROLL_FORWARD_PAUSED, ROLLING_FORWARD, ROLL_FORWARD_AWAITING_PULSE, ABORTED,
ERROR)
+          .putAll(ROLL_FORWARD_PAUSED,
+              ROLLING_BACK,
+              ROLLING_FORWARD,
+              ROLL_FORWARD_AWAITING_PULSE,
+              ABORTED,
+              ERROR)
           .putAll(ROLL_BACK_PAUSED, ROLLING_BACK, ROLL_BACK_AWAITING_PULSE, ABORTED, ERROR)
-          .putAll(ROLL_FORWARD_AWAITING_PULSE, ROLLING_FORWARD, ROLL_FORWARD_PAUSED, ABORTED,
ERROR)
+          .putAll(ROLL_FORWARD_AWAITING_PULSE,
+              ROLLING_BACK,
+              ROLLING_FORWARD,
+              ROLL_FORWARD_PAUSED,
+              ABORTED,
+              ERROR)
           .putAll(ROLL_BACK_AWAITING_PULSE, ROLLING_BACK, ROLL_BACK_PAUSED, ABORTED, ERROR)
           .build();
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/main/python/apache/aurora/client/api/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/__init__.py b/src/main/python/apache/aurora/client/api/__init__.py
index ec2c786..9149c30 100644
--- a/src/main/python/apache/aurora/client/api/__init__.py
+++ b/src/main/python/apache/aurora/client/api/__init__.py
@@ -198,6 +198,17 @@ class AuroraClientAPI(object):
     """
     return self._scheduler_proxy.abortJobUpdate(update_key, message)
 
+  def rollback_job_update(self, update_key, message):
+    """Requests Scheduler to rollback active job update.
+
+    Arguments:
+    update_key -- Update identifier.
+    message -- Audit message to include with the change.
+
+    Returns response object.
+    """
+    return self._scheduler_proxy.rollbackJobUpdate(update_key, message)
+
   def get_job_update_diff(self, config, instances=None):
     """Requests scheduler to calculate difference between scheduler and client job views.
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/main/python/apache/aurora/client/cli/update.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/update.py b/src/main/python/apache/aurora/client/cli/update.py
index bb526f7..23aaa2c 100644
--- a/src/main/python/apache/aurora/client/cli/update.py
+++ b/src/main/python/apache/aurora/client/cli/update.py
@@ -76,11 +76,18 @@ class UpdateController(object):
     update_key = self.get_update_key(job_key)
     if update_key is None:
       self.context.print_err("No active update found for this job.")
-      return EXIT_INVALID_PARAMETER
+      return EXIT_INVALID_PARAMETER, update_key
     resp = mutate_fn(update_key)
     self.context.log_response_and_raise(resp, err_code=EXIT_API_ERROR, err_msg=error_msg)
     self.context.print_out(success_msg)
-    return EXIT_OK
+    return EXIT_OK, update_key
+
+  def rollback(self, job_key, message):
+    return self._modify_update(
+        job_key,
+        lambda key: self.api.rollback_job_update(key, message),
+        "Failed to rollback update due to error:",
+        "Update rollback has started.")
 
   def pause(self, job_key, message):
     return self._modify_update(
@@ -103,7 +110,6 @@ class UpdateController(object):
         "Failed to abort update due to error:",
         "Update has been aborted.")
 
-
 def format_timestamp(stamp_millis):
   return datetime.datetime.utcfromtimestamp(stamp_millis / 1000).isoformat()
 
@@ -116,16 +122,16 @@ MESSAGE_OPTION = CommandOption(
     help='Message to include with the update state transition')
 
 
+WAIT_OPTION = lambda help_msg: CommandOption(
+    '--wait',
+    default=False,
+    action='store_true',
+    help=help_msg)
+
 class StartUpdate(Verb):
 
   UPDATE_MSG_TEMPLATE = "Job update has started. View your update progress at %s"
 
-  WAIT_OPTION = CommandOption(
-      '--wait',
-      default=False,
-      action='store_true',
-      help='Wait until the update completes')
-
   def __init__(self, clock=time):
     self._clock = clock
 
@@ -143,7 +149,7 @@ class StartUpdate(Verb):
         STRICT_OPTION,
         INSTANCES_SPEC_ARGUMENT,
         CONFIG_ARGUMENT,
-        self.WAIT_OPTION
+        WAIT_OPTION('Wait until the update completes')
     ]
 
   @property
@@ -189,14 +195,26 @@ class StartUpdate(Verb):
       context.print_out(self.UPDATE_MSG_TEMPLATE % url)
 
       if context.options.wait:
-        return wait_for_update(context, self._clock, api, update_key)
+        return wait_for_update(context, self._clock, api, update_key, update_state_to_err_code)
     else:
       context.print_out(combine_messages(resp))
 
     return EXIT_OK
 
 
-def wait_for_update(context, clock, api, update_key):
+def rollback_state_to_err_code(state):
+    return (EXIT_OK if state == JobUpdateStatus.ROLLED_BACK else
+            EXIT_COMMAND_FAILURE if state == JobUpdateStatus.ROLLED_FORWARD else
+            EXIT_UNKNOWN_ERROR)
+
+
+def update_state_to_err_code(state):
+    return (EXIT_OK if state == JobUpdateStatus.ROLLED_FORWARD else
+            EXIT_COMMAND_FAILURE if state == JobUpdateStatus.ROLLED_BACK else
+            EXIT_UNKNOWN_ERROR)
+
+
+def wait_for_update(context, clock, api, update_key, state_to_err_code_func):
   cur_state = None
 
   while True:
@@ -209,12 +227,7 @@ def wait_for_update(context, clock, api, update_key):
         cur_state = new_state
         context.print_out('Current state %s' % JobUpdateStatus._VALUES_TO_NAMES[cur_state])
         if cur_state not in ACTIVE_JOB_UPDATE_STATES:
-          if cur_state == JobUpdateStatus.ROLLED_FORWARD:
-            return EXIT_OK
-          elif cur_state == JobUpdateStatus.ROLLED_BACK:
-            return EXIT_COMMAND_FAILURE
-          else:
-            return EXIT_UNKNOWN_ERROR
+          return state_to_err_code_func(cur_state)
       clock.sleep(5)
     elif len(summaries) == 0:
       raise context.CommandError(EXIT_INVALID_PARAMETER, 'Job update not found.')
@@ -252,7 +265,9 @@ class UpdateWait(Verb):
         context,
         self._clock,
         context.get_api(context.options.jobspec.cluster),
-        JobUpdateKey(job=context.options.jobspec.to_thrift(), id=context.options.id))
+        JobUpdateKey(job=context.options.jobspec.to_thrift(), id=context.options.id),
+        update_state_to_err_code
+    )
 
 
 class PauseUpdate(Verb):
@@ -269,9 +284,10 @@ class PauseUpdate(Verb):
 
   def execute(self, context):
     job_key = context.options.jobspec
-    return UpdateController(context.get_api(job_key.cluster), context).pause(
+    err_code, _ = UpdateController(context.get_api(job_key.cluster), context).pause(
         job_key,
         context.options.message)
+    return err_code
 
 
 class ResumeUpdate(Verb):
@@ -288,9 +304,10 @@ class ResumeUpdate(Verb):
 
   def execute(self, context):
     job_key = context.options.jobspec
-    return UpdateController(context.get_api(job_key.cluster), context).resume(
+    err_code, _ = UpdateController(context.get_api(job_key.cluster), context).resume(
         job_key,
         context.options.message)
+    return err_code
 
 
 class AbortUpdate(Verb):
@@ -307,9 +324,44 @@ class AbortUpdate(Verb):
 
   def execute(self, context):
     job_key = context.options.jobspec
-    return UpdateController(context.get_api(job_key.cluster), context).abort(
+    err_code, _ = UpdateController(context.get_api(job_key.cluster), context).abort(
         job_key,
         context.options.message)
+    return err_code
+
+
+class RollbackUpdate(Verb):
+  def __init__(self, clock=time):
+    self._clock = clock
+
+  @property
+  def name(self):
+    return 'rollback'
+
+  def get_options(self):
+    return [JOBSPEC_ARGUMENT, MESSAGE_OPTION, WAIT_OPTION('Wait until the update rolls back')]
+
+  @property
+  def help(self):
+    return 'Rollback an in-progress update.'
+
+  def execute(self, context):
+    job_key = context.options.jobspec
+    update_controller = UpdateController(
+      context.get_api(job_key.cluster), context)
+
+    err_code, update_key = update_controller.rollback(
+      job_key, context.options.message)
+    if err_code == EXIT_OK and context.options.wait:
+      return wait_for_update(
+        context,
+        self._clock,
+        context.get_api(job_key.cluster),
+        update_key,
+        rollback_state_to_err_code
+      )
+
+    return err_code
 
 
 UpdateFilter = namedtuple('UpdateFilter', ['cluster', 'role', 'env', 'job'])
@@ -554,6 +606,7 @@ class Update(Noun):
     self.register_verb(PauseUpdate())
     self.register_verb(ResumeUpdate())
     self.register_verb(AbortUpdate())
+    self.register_verb(RollbackUpdate())
     self.register_verb(ListUpdates())
     self.register_verb(UpdateInfo())
     self.register_verb(UpdateWait())

http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachineTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachineTest.java
b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachineTest.java
index 8d78bae..3522dcc 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachineTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachineTest.java
@@ -60,11 +60,13 @@ public class JobUpdateStateMachineTest {
           .put(Pair.of(ROLL_FORWARD_PAUSED, ROLL_FORWARD_AWAITING_PULSE), STOP_WATCHING)
           .put(Pair.of(ROLL_FORWARD_PAUSED, ABORTED), STOP_WATCHING)
           .put(Pair.of(ROLL_FORWARD_PAUSED, ERROR), STOP_WATCHING)
+          .put(Pair.of(ROLL_FORWARD_PAUSED, ROLLING_BACK), ROLL_BACK)
           .put(Pair.of(ROLL_BACK_PAUSED, ROLLING_BACK), ROLL_BACK)
           .put(Pair.of(ROLL_BACK_PAUSED, ROLL_BACK_AWAITING_PULSE), STOP_WATCHING)
           .put(Pair.of(ROLL_BACK_PAUSED, ABORTED), STOP_WATCHING)
           .put(Pair.of(ROLL_BACK_PAUSED, ERROR), STOP_WATCHING)
           .put(Pair.of(ROLL_FORWARD_AWAITING_PULSE, ROLLING_FORWARD), ROLL_FORWARD)
+          .put(Pair.of(ROLL_FORWARD_AWAITING_PULSE, ROLLING_BACK), ROLL_BACK)
           .put(Pair.of(ROLL_FORWARD_AWAITING_PULSE, ROLL_FORWARD_PAUSED), STOP_WATCHING)
           .put(Pair.of(ROLL_FORWARD_AWAITING_PULSE, ABORTED), STOP_WATCHING)
           .put(Pair.of(ROLL_FORWARD_AWAITING_PULSE, ERROR), STOP_WATCHING)

http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
index e157c0d..04551f1 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
@@ -263,6 +263,11 @@ public class JobUpdaterIT extends EasyMockTest {
         storeProvider -> storeProvider.getJobUpdateStore().fetchJobUpdateDetails(UPDATE_ID).get());
   }
 
+  private IJobUpdateDetails getDetails(IJobUpdateKey key) {
+    return storage.read(
+        storeProvider -> storeProvider.getJobUpdateStore().fetchJobUpdateDetails(key).get());
+  }
+
   private void assertLatestUpdateMessage(String expected) {
     IJobUpdateDetails details = getDetails();
     assertEquals(expected, Iterables.getLast(details.getUpdateEvents()).getMessage());
@@ -272,7 +277,15 @@ public class JobUpdaterIT extends EasyMockTest {
       JobUpdateStatus expected,
       Multimap<Integer, JobUpdateAction> expectedActions) {
 
-    IJobUpdateDetails details = getDetails();
+    assertStateUpdate(UPDATE_ID, expected, expectedActions);
+  }
+
+  private void assertStateUpdate(
+      IJobUpdateKey key,
+      JobUpdateStatus expected,
+      Multimap<Integer, JobUpdateAction> expectedActions) {
+
+    IJobUpdateDetails details = getDetails(key);
     Iterable<IJobInstanceUpdateEvent> orderedEvents =
         EVENT_ORDER.sortedCopy(details.getInstanceEvents());
     Multimap<Integer, IJobInstanceUpdateEvent> eventsByInstance =
@@ -1365,15 +1378,214 @@ public class JobUpdaterIT extends EasyMockTest {
     updater.resume(UPDATE_ID, AUDIT);
   }
 
-  private static IJobUpdateSummary makeUpdateSummary() {
+  @Test
+  public void testFailToRollbackCompletedUpdate() throws Exception {
+    expectTaskKilled().times(3);
+
+    control.replay();
+
+    JobUpdate builder = makeJobUpdate(makeInstanceConfig(0, 2, OLD_CONFIG)).newBuilder();
+    builder.getInstructions().getSettings()
+        .setWaitForBatchCompletion(true)
+        .setUpdateGroupSize(2);
+    IJobUpdate update = IJobUpdate.build(builder);
+    insertInitialTasks(update);
+
+    changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 1, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 2, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+
+    ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = ImmutableMultimap.builder();
+
+    // Instances 0 and 1 are updated.
+    updater.start(update, AUDIT);
+    actions.putAll(0, INSTANCE_UPDATING)
+        .putAll(1, INSTANCE_UPDATING);
+    assertState(ROLLING_FORWARD, actions.build());
+    changeState(JOB, 1, FINISHED, ASSIGNED, STARTING, RUNNING);
+    clock.advance(Amount.of(WATCH_TIMEOUT.getValue() / 2, Time.MILLISECONDS));
+    changeState(JOB, 0, FINISHED, ASSIGNED, STARTING, RUNNING);
+    clock.advance(Amount.of(WATCH_TIMEOUT.getValue() / 2, Time.MILLISECONDS));
+
+    // Instance 1 finished first, but update does not yet proceed until 0 finishes.
+    actions.putAll(1, INSTANCE_UPDATED);
+    assertState(ROLLING_FORWARD, actions.build());
+    clock.advance(WATCH_TIMEOUT);
+    actions.putAll(0, INSTANCE_UPDATED);
+
+    // Instance 2 is updated.
+    changeState(JOB, 2, FINISHED, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+    actions.putAll(2, INSTANCE_UPDATING, INSTANCE_UPDATED);
+    assertState(ROLLED_FORWARD, actions.build());
+
+    assertJobState(
+        JOB,
+        ImmutableMap.of(0, NEW_CONFIG, 1, NEW_CONFIG, 2, NEW_CONFIG));
+
+    try {
+      updater.rollback(UPDATE_ID, AUDIT);
+      fail();
+    } catch (UpdateStateException e) {
+      // Expected.
+    }
+  }
+
+  @Test
+  public void testRollbackDuringUpgrade() throws Exception {
+    expectTaskKilled().times(5);
+
+    control.replay();
+
+    JobUpdate builder = makeJobUpdate(makeInstanceConfig(0, 2, OLD_CONFIG)).newBuilder();
+    builder.getInstructions().getSettings()
+        .setWaitForBatchCompletion(true)
+        .setUpdateGroupSize(2);
+    IJobUpdate update = IJobUpdate.build(builder);
+    insertInitialTasks(update);
+
+    changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 1, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 2, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+
+    ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = ImmutableMultimap.builder();
+
+    // Instances 0 and 1 are updated.
+    updater.start(update, AUDIT);
+    actions.putAll(0, INSTANCE_UPDATING)
+        .putAll(1, INSTANCE_UPDATING);
+    assertState(ROLLING_FORWARD, actions.build());
+    changeState(JOB, 1, FINISHED, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 0, FINISHED, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+
+    actions.putAll(0, INSTANCE_UPDATED)
+        .putAll(1, INSTANCE_UPDATED)
+        .putAll(2, INSTANCE_UPDATING);
+    assertState(ROLLING_FORWARD, actions.build());
+    clock.advance(WATCH_TIMEOUT);
+
+    updater.rollback(UPDATE_ID, AUDIT);
+
+    actions.putAll(1, INSTANCE_ROLLING_BACK);
+    actions.putAll(2, INSTANCE_ROLLING_BACK);
+    changeState(JOB, 1, KILLED);
+    changeState(JOB, 2, KILLED);
+    clock.advance(WATCH_TIMEOUT);
+
+    assertState(ROLLING_BACK, actions.build());
+    clock.advance(WATCH_TIMEOUT);
+
+    changeState(JOB, 2, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 1, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+
+    actions.putAll(2, INSTANCE_ROLLED_BACK)
+        .putAll(1, INSTANCE_ROLLED_BACK);
+    changeState(JOB, 0, KILLED);
+    actions.putAll(0, INSTANCE_ROLLING_BACK);
+    clock.advance(WATCH_TIMEOUT);
+
+    assertState(ROLLING_BACK, actions.build());
+
+    changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+    actions.putAll(0, INSTANCE_ROLLED_BACK);
+    clock.advance(WATCH_TIMEOUT);
+
+    assertState(ROLLED_BACK, actions.build());
+
+    assertJobState(
+        JOB,
+        ImmutableMap.of(0, OLD_CONFIG, 1, OLD_CONFIG, 2, OLD_CONFIG));
+  }
+
+  @Test
+  public void testRollbackCoordinatedUpdate() throws Exception {
+    control.replay();
+
+    JobUpdate builder = makeJobUpdate(
+        // No-op - task is already matching the new config.
+        makeInstanceConfig(0, 0, NEW_CONFIG),
+        // Tasks needing update.
+        makeInstanceConfig(1, 2, OLD_CONFIG)).newBuilder();
+
+    builder.getInstructions().getSettings().setBlockIfNoPulsesAfterMs((int) PULSE_TIMEOUT_MS);
+    insertInitialTasks(IJobUpdate.build(builder));
+
+    changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 1, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 2, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+
+    ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = ImmutableMultimap.builder();
+    updater.start(IJobUpdate.build(builder), AUDIT);
+
+    // The update is blocked initially waiting for a pulse.
+    assertState(ROLL_FORWARD_AWAITING_PULSE, actions.build());
+
+    updater.rollback(UPDATE_ID, AUDIT);
+
+    clock.advance(WATCH_TIMEOUT);
+    assertState(ROLLED_BACK, actions.build());
+  }
+
+  @Test
+  public void testRollbackPausedForwardUpdate() throws Exception {
+    expectTaskKilled().times(2);
+
+    control.replay();
+
+    JobUpdate builder = makeJobUpdate(
+        // No-op - task is already matching the new config.
+        makeInstanceConfig(0, 0, NEW_CONFIG),
+        // Tasks needing update.
+        makeInstanceConfig(1, 2, OLD_CONFIG)).newBuilder();
+
+    insertInitialTasks(IJobUpdate.build(builder));
+
+    changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 1, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 2, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+
+    ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = ImmutableMultimap.builder();
+    updater.start(IJobUpdate.build(builder), AUDIT);
+
+    actions.putAll(1, INSTANCE_UPDATING);
+    assertState(ROLLING_FORWARD, actions.build());
+    clock.advance(WATCH_TIMEOUT);
+    changeState(JOB, 1, KILLED, ASSIGNED, STARTING, RUNNING);
+
+    updater.pause(UPDATE_ID, AUDIT);
+    assertState(ROLL_FORWARD_PAUSED, actions.build());
+
+    updater.rollback(UPDATE_ID, AUDIT);
+
+    actions.putAll(1, INSTANCE_ROLLING_BACK);
+    clock.advance(WATCH_TIMEOUT);
+    assertState(ROLLING_BACK, actions.build());
+
+    actions.putAll(1, INSTANCE_ROLLED_BACK);
+    changeState(JOB, 1, KILLED, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+    assertState(ROLLED_BACK, actions.build());
+
+    assertJobState(
+        JOB,
+        ImmutableMap.of(0, NEW_CONFIG, 1, OLD_CONFIG, 2, OLD_CONFIG));
+  }
+
+  private static IJobUpdateSummary makeUpdateSummary(IJobUpdateKey key) {
     return IJobUpdateSummary.build(new JobUpdateSummary()
         .setUser("user")
-        .setKey(UPDATE_ID.newBuilder()));
+        .setKey(key.newBuilder()));
   }
 
   private static IJobUpdate makeJobUpdate(IInstanceTaskConfig... configs) {
     JobUpdate builder = new JobUpdate()
-        .setSummary(makeUpdateSummary().newBuilder())
+        .setSummary(makeUpdateSummary(UPDATE_ID).newBuilder())
         .setInstructions(new JobUpdateInstructions()
             .setDesiredState(new InstanceTaskConfig()
                 .setTask(NEW_CONFIG.newBuilder())

http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/test/python/apache/aurora/client/api/test_scheduler_client.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_scheduler_client.py b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
index afbd385..afac250 100644
--- a/src/test/python/apache/aurora/client/api/test_scheduler_client.py
+++ b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
@@ -183,6 +183,11 @@ class TestSchedulerProxyInjection(unittest.TestCase):
     self.mox.ReplayAll()
     self.make_scheduler_proxy().abortJobUpdate('update_id')
 
+  def test_rollbackJobUpdate(self):
+    self.mock_thrift_client.rollbackJobUpdate('update_id').AndReturn(DEFAULT_RESPONSE)
+    self.mox.ReplayAll()
+    self.make_scheduler_proxy().rollbackJobUpdate('update_id')
+
   def test_pulseJobUpdate(self):
     self.mock_thrift_client.pulseJobUpdate('update_id').AndReturn(DEFAULT_RESPONSE)
     self.mox.ReplayAll()

http://git-wip-us.apache.org/repos/asf/aurora/blob/a4fdf284/src/test/python/apache/aurora/client/cli/test_supdate.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_supdate.py b/src/test/python/apache/aurora/client/cli/test_supdate.py
index 317b175..92fb039 100644
--- a/src/test/python/apache/aurora/client/cli/test_supdate.py
+++ b/src/test/python/apache/aurora/client/cli/test_supdate.py
@@ -34,6 +34,7 @@ from apache.aurora.client.cli.update import (
     ListUpdates,
     PauseUpdate,
     ResumeUpdate,
+    RollbackUpdate,
     StartUpdate,
     UpdateFilter,
     UpdateInfo,
@@ -705,3 +706,88 @@ class TestUpdateWait(AuroraClientCommandTest):
 
     assert self._fake_context.get_out() == []
     assert self._mock_api.query_job_updates.mock_calls == [self._fetch_call]
+
+
+class TestRollbackUpdate(AuroraClientCommandTest):
+  def setUp(self):
+    self._command = RollbackUpdate()
+    self._mock_options = mock_verb_options(self._command)
+    self._mock_options.jobspec = self.TEST_JOBKEY
+    self._mock_options.wait = False
+    self._fake_context = FakeAuroraCommandContext()
+    self._fake_context.set_options(self._mock_options)
+    self._mock_api = self._fake_context.get_api('UNUSED')
+
+  def test_rollback_update_command_line_succeeds(self):
+    self._mock_api.query_job_updates.return_value = get_status_query_response()
+    self._mock_api.rollback_job_update.return_value = self.create_simple_success_response()
+    self._mock_options.message = 'hello'
+    assert self._command.execute(self._fake_context) == EXIT_OK
+
+    assert self._mock_api.query_job_updates.mock_calls == [
+        call(update_statuses=ACTIVE_JOB_UPDATE_STATES, job_key=self.TEST_JOBKEY)]
+    assert self._mock_api.rollback_job_update.mock_calls == [call(UPDATE_KEY, 'hello')]
+    assert self._fake_context.get_out() == ["Update rollback has started."]
+    assert self._fake_context.get_err() == []
+
+  def test_rollback_update_command_line_error(self):
+    self._mock_api.query_job_updates.return_value = get_status_query_response()
+    self._mock_api.rollback_job_update.return_value = self.create_error_response()
+
+    with pytest.raises(Context.CommandError):
+      self._command.execute(self._fake_context)
+    assert self._mock_api.query_job_updates.mock_calls == [
+        call(update_statuses=ACTIVE_JOB_UPDATE_STATES, job_key=self.TEST_JOBKEY)]
+    assert self._mock_api.rollback_job_update.mock_calls == [call(UPDATE_KEY, None)]
+
+    assert self._fake_context.get_out() == []
+    assert self._fake_context.get_err() == ["Failed to rollback update due to error:", "\tWhoops"]
+
+  def test_rollback_invalid_api_response(self):
+    # Mimic the API returning two active updates for one job, which should be impossible.
+    self._mock_api.query_job_updates.return_value = get_status_query_response(count=2)
+    self._mock_api.rollback_job_update.return_value = self.create_error_response()
+    with pytest.raises(Context.CommandError) as error:
+      self._command.execute(self._fake_context)
+      assert error.message == (
+        'scheduler returned multiple active updates for this job.')
+
+    assert self._mock_api.query_job_updates.mock_calls == [
+        call(update_statuses=ACTIVE_JOB_UPDATE_STATES, job_key=self.TEST_JOBKEY)]
+    assert self._mock_api.rollback_job_update.mock_calls == []
+    assert self._fake_context.get_out() == []
+    assert self._fake_context.get_err() == []
+
+  def test_rollback_and_wait_success(self):
+    self._mock_options.wait = True
+
+    updating_response = get_status_query_response(status=JobUpdateStatus.ROLLING_FORWARD)
+    updated_response = get_status_query_response(status=JobUpdateStatus.ROLLED_BACK)
+
+    self._mock_api.query_job_updates.side_effect = [updating_response, updated_response]
+    self._mock_api.rollback_job_update.return_value = self.create_simple_success_response()
+
+    assert self._command.execute(self._fake_context) == EXIT_OK
+    assert self._mock_api.rollback_job_update.mock_calls == [call(UPDATE_KEY, None)]
+    assert self._fake_context.get_err() == []
+
+  def test_rollback_and_wait_rolled_forward(self):
+    self._mock_options.wait = True
+
+    updating_response = get_status_query_response(status=JobUpdateStatus.ROLLING_FORWARD)
+    updated_response = get_status_query_response(status=JobUpdateStatus.ROLLED_FORWARD)
+
+    self._mock_api.query_job_updates.side_effect = [updating_response, updated_response]
+
+    self._mock_api.rollback_job_update.return_value = self.create_simple_success_response()
+    assert self._command.execute(self._fake_context) == EXIT_COMMAND_FAILURE
+
+  def test_rollback_and_wait_error(self):
+    self._mock_options.wait = True
+    updating_response = get_status_query_response(status=JobUpdateStatus.ROLLING_FORWARD)
+    failed_response = get_status_query_response(status=JobUpdateStatus.ERROR)
+
+    self._mock_api.query_job_updates.side_effect = [updating_response, failed_response]
+
+    self._mock_api.rollback_job_update.return_value = self.create_simple_success_response()
+    assert self._command.execute(self._fake_context) == EXIT_UNKNOWN_ERROR


Mime
View raw message