aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject git commit: Surface instance update status changes so they may be persisted.
Date Thu, 18 Sep 2014 17:20:59 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master d5d52a3fd -> 9fd87234e


Surface instance update status changes so they may be persisted.

Reviewed at https://reviews.apache.org/r/25753/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/9fd87234
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/9fd87234
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/9fd87234

Branch: refs/heads/master
Commit: 9fd87234ef62e9c62b0140f4f7f90e09c73fa8a9
Parents: d5d52a3
Author: Bill Farner <wfarner@apache.org>
Authored: Thu Sep 18 10:13:54 2014 -0700
Committer: Bill Farner <wfarner@apache.org>
Committed: Thu Sep 18 10:13:54 2014 -0700

----------------------------------------------------------------------
 .../scheduler/updater/InstanceUpdater.java      |   2 -
 .../updater/JobUpdateControllerImpl.java        |  42 +++---
 .../scheduler/updater/OneWayJobUpdater.java     | 139 ++++++++-----------
 .../aurora/scheduler/updater/SideEffect.java    | 118 ++++++++++++++++
 .../scheduler/updater/StateEvaluator.java       |  26 +++-
 .../aurora/scheduler/updater/UpdateFactory.java |  29 ++--
 .../aurora/scheduler/updater/EnumsTest.java     |   2 +-
 .../scheduler/updater/OneWayJobUpdaterTest.java |  59 +++++---
 8 files changed, 271 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9fd87234/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java b/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
index dc11abd..13599c0 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
@@ -45,8 +45,6 @@ import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.SUCCEEDE
  * In part of a job update, this manages the update of an individual instance. This includes
  * deciding how to effect an update from a possibly-absent old configuration to a possibly-absent
  * new configuration, and detecting whether a replaced instance becomes unstable.
- *
- * TODO(wfarner): This probably needs to be parameterized so that it may be reused for rollbacks.
  */
 class InstanceUpdater implements StateEvaluator<Optional<IScheduledTask>> {
   private static final Logger LOG = Logger.getLogger(InstanceUpdater.class.getName());

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9fd87234/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index 159b09e..33bf7c7 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -22,7 +22,6 @@ import java.util.logging.Logger;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
@@ -437,32 +436,37 @@ class JobUpdateControllerImpl implements JobUpdateController {
     LOG.info(JobKeys.canonicalString(job) + " evaluation result: " + result);
     OneWayStatus status = result.getStatus();
     if (status == SUCCEEDED || status == OneWayStatus.FAILED) {
-      Preconditions.checkArgument(
-          result.getInstanceActions().isEmpty(),
-          "A terminal state should not specify actions.");
+      if (SideEffect.hasActions(result.getSideEffects().values())) {
+        throw new IllegalArgumentException(
+            "A terminal state should not specify actions: " + result);
+      }
 
       if (status == SUCCEEDED) {
         changeUpdateStatus(updateStore, taskStore, summary, update.getSuccessStatus());
       } else {
         changeUpdateStatus(updateStore, taskStore, summary, update.getFailureStatus());
       }
-    } else if (result.getInstanceActions().isEmpty()) {
-      LOG.info("No actions to perform at this time for update of " + job);
     } else {
-      for (Map.Entry<Integer, InstanceAction> entry : result.getInstanceActions().entrySet())
{
+      LOG.info("Executing side-effects for update of " + job + ": " + result.getSideEffects());
+      for (Map.Entry<Integer, SideEffect> entry : result.getSideEffects().entrySet())
{
         IInstanceKey instance = InstanceKeys.from(job, entry.getKey());
-        Optional<InstanceActionHandler> handler = entry.getValue().getHandler();
-        if (handler.isPresent()) {
-          Amount<Long, Time> reevaluateDelay = handler.get().getReevaluationDelay(
-              instance,
-              updateStore.fetchJobUpdateConfiguration(summary.getUpdateId()).get(),
-              taskStore,
-              stateManager,
-              updaterStatus);
-          executor.schedule(
-              getDeferredEvaluator(instance, summary.getUpdateId()),
-              reevaluateDelay.getValue(),
-              reevaluateDelay.getUnit().getTimeUnit());
+        // TODO(wfarner): Persist SideEffect.getStatusChanges as JobInstanceUpdateEvents.
+
+        Optional<InstanceAction> action = entry.getValue().getAction();
+        if (action.isPresent()) {
+          Optional<InstanceActionHandler> handler = action.get().getHandler();
+          if (handler.isPresent()) {
+            Amount<Long, Time> reevaluateDelay = handler.get().getReevaluationDelay(
+                instance,
+                updateStore.fetchJobUpdateConfiguration(summary.getUpdateId()).get(),
+                taskStore,
+                stateManager,
+                updaterStatus);
+            executor.schedule(
+                getDeferredEvaluator(instance, summary.getUpdateId()),
+                reevaluateDelay.getValue(),
+                reevaluateDelay.getUnit().getTimeUnit());
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9fd87234/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdater.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdater.java b/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdater.java
index 5a9af39..1d0bbfe 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdater.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdater.java
@@ -20,7 +20,6 @@ import java.util.logging.Logger;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableMap;
@@ -35,6 +34,10 @@ import static java.util.Objects.requireNonNull;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import static org.apache.aurora.scheduler.updater.SideEffect.InstanceUpdateStatus.FAILED;
+import static org.apache.aurora.scheduler.updater.SideEffect.InstanceUpdateStatus.IDLE;
+import static org.apache.aurora.scheduler.updater.SideEffect.InstanceUpdateStatus.SUCCEEDED;
+import static org.apache.aurora.scheduler.updater.SideEffect.InstanceUpdateStatus.WORKING;
 import static org.apache.aurora.scheduler.updater.StateEvaluator.Result;
 
 /**
@@ -88,38 +91,23 @@ class OneWayJobUpdater<K, T> {
         }));
   }
 
-  private static final Function<InstanceUpdate<?>, InstanceUpdateStatus> GET_STATE
=
-      new Function<InstanceUpdate<?>, InstanceUpdateStatus>() {
+  private static final Function<InstanceUpdate<?>, SideEffect.InstanceUpdateStatus>
GET_STATE =
+      new Function<InstanceUpdate<?>, SideEffect.InstanceUpdateStatus>() {
         @Override
-        public InstanceUpdateStatus apply(InstanceUpdate<?> manager) {
+        public SideEffect.InstanceUpdateStatus apply(InstanceUpdate<?> manager) {
           return manager.getState();
         }
       };
 
   private static <K, T> Set<K> filterByStatus(
       Map<K, InstanceUpdate<T>> instances,
-      InstanceUpdateStatus status) {
+      SideEffect.InstanceUpdateStatus status) {
 
     return ImmutableSet.copyOf(
         Maps.filterValues(instances, Predicates.compose(Predicates.equalTo(status), GET_STATE))
             .keySet());
   }
 
-  private static Optional<InstanceAction> resultToAction(Result result) {
-    switch (result) {
-      case EVALUATE_ON_STATE_CHANGE:
-        return Optional.of(InstanceAction.AWAIT_STATE_CHANGE);
-      case REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE:
-        return Optional.of(InstanceAction.ADD_TASK);
-      case KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE:
-        return Optional.of(InstanceAction.KILL_TASK);
-      case EVALUATE_AFTER_MIN_RUNNING_MS:
-        return Optional.of(InstanceAction.WATCH_TASK);
-      default:
-        return Optional.absent();
-    }
-  }
-
   @VisibleForTesting
   Set<K> getInstances() {
     return ImmutableSet.copyOf(instances.keySet());
@@ -162,7 +150,7 @@ class OneWayJobUpdater<K, T> {
 
     // Call order is important here: update on-demand instances, evaluate new instances,
compute
     // job update state.
-    ImmutableMap.Builder<K, InstanceAction> actions = ImmutableMap.<K, InstanceAction>builder()
+    ImmutableMap.Builder<K, SideEffect> actions = ImmutableMap.<K, SideEffect>builder()
         // Re-evaluate instances that are in need of update.
         .putAll(evaluateInstances(instancesNeedingUpdate));
 
@@ -171,64 +159,56 @@ class OneWayJobUpdater<K, T> {
       actions.putAll(startNextInstanceGroup(stateProvider));
     }
 
-    return new EvaluationResult<K>(computeJobUpdateStatus(), actions.build());
+    return new EvaluationResult<>(computeJobUpdateStatus(), actions.build());
   }
 
-  private Map<K, InstanceAction> evaluateInstances(Map<K, T> updatedInstances)
{
-    ImmutableMap.Builder<K, InstanceAction> actions = ImmutableMap.builder();
+  private Map<K, SideEffect> evaluateInstances(Map<K, T> updatedInstances) {
+    ImmutableMap.Builder<K, SideEffect> sideEffects = ImmutableMap.builder();
     for (Map.Entry<K, T> entry : updatedInstances.entrySet()) {
       K instanceId = entry.getKey();
       InstanceUpdate<T> update = instances.get(instanceId);
       // Suppress state changes for updates that are not in-progress.
-      if (update.getState() == InstanceUpdateStatus.WORKING) {
-        Optional<InstanceAction> action = resultToAction(update.evaluate(entry.getValue()));
-        if (action.isPresent()) {
-          actions.put(instanceId, action.get());
-        }
+      if (update.getState() == WORKING) {
+        sideEffects.put(instanceId, update.evaluate(entry.getValue()));
       } else {
         LOG.info("Ignoring state change for instance outside working set: " + instanceId);
       }
     }
 
-    return actions.build();
+    return sideEffects.build();
   }
 
-  private Map<K, InstanceAction> startNextInstanceGroup(InstanceStateProvider<K,
T> stateProvider) {
-    Set<K> idle = filterByStatus(instances, InstanceUpdateStatus.IDLE);
+  private Map<K, SideEffect> startNextInstanceGroup(InstanceStateProvider<K, T>
stateProvider) {
+    Set<K> idle = filterByStatus(instances, IDLE);
     if (idle.isEmpty()) {
       return ImmutableMap.of();
     } else {
-      ImmutableMap.Builder<K, InstanceAction> builder = ImmutableMap.builder();
-      Set<K> working = filterByStatus(instances, InstanceUpdateStatus.WORKING);
+      ImmutableMap.Builder<K, SideEffect> builder = ImmutableMap.builder();
+      Set<K> working = filterByStatus(instances, WORKING);
       Set<K> nextGroup = strategy.getNextGroup(idle, working);
       if (!nextGroup.isEmpty()) {
         for (K instance : nextGroup) {
-          Result result = instances.get(instance).evaluate(stateProvider.getState(instance));
-          Optional<InstanceAction> action = resultToAction(result);
-          if (action.isPresent()) {
-            builder.put(instance, action.get());
-          }
+          builder.put(instance, instances.get(instance).evaluate(stateProvider.getState(instance)));
         }
-        LOG.info("Updated working set for update to "
-            + filterByStatus(instances, InstanceUpdateStatus.WORKING));
+        LOG.info("Changed working set for update to "
+            + filterByStatus(instances, WORKING));
       }
 
-      Map<K, InstanceAction> actions = builder.build();
-      if (!idle.isEmpty() && working.isEmpty() && actions.isEmpty()) {
+      Map<K, SideEffect> sideEffects = builder.build();
+      if (!idle.isEmpty() && working.isEmpty() && !SideEffect.hasActions(sideEffects.values()))
{
         // There's no in-flight instances, and no actions - so there's nothing left to initiate
more
         // work on this job. Try to find more work, or converge.
-        return startNextInstanceGroup(stateProvider);
+        return builder.putAll(startNextInstanceGroup(stateProvider)).build();
       } else {
-        return actions;
+        return sideEffects;
       }
     }
   }
 
   private OneWayStatus computeJobUpdateStatus() {
-    Set<K> idle = filterByStatus(instances, InstanceUpdateStatus.IDLE);
-    Set<K> working = filterByStatus(instances, InstanceUpdateStatus.WORKING);
-    Set<K> failed = filterByStatus(instances, InstanceUpdateStatus.FAILED);
-    // TODO(wfarner): This needs to be updated to support rollback.
+    Set<K> idle = filterByStatus(instances, IDLE);
+    Set<K> working = filterByStatus(instances, WORKING);
+    Set<K> failed = filterByStatus(instances, FAILED);
     if (failed.size() > maxFailedInstances) {
       stateMachine.transition(OneWayStatus.FAILED);
     } else if (working.isEmpty() && idle.isEmpty()) {
@@ -243,49 +223,44 @@ class OneWayJobUpdater<K, T> {
    */
   private static class InstanceUpdate<T> {
     private final StateEvaluator<T> evaluator;
-    private final StateMachine<InstanceUpdateStatus> stateMachine;
+    private final StateMachine<SideEffect.InstanceUpdateStatus> stateMachine;
 
     InstanceUpdate(String name, StateEvaluator<T> evaluator) {
       this.evaluator = requireNonNull(evaluator);
-      stateMachine = StateMachine.<InstanceUpdateStatus>builder(name)
-          .initialState(InstanceUpdateStatus.IDLE)
-          .addState(InstanceUpdateStatus.IDLE, InstanceUpdateStatus.WORKING)
-          .addState(
-              InstanceUpdateStatus.WORKING,
-              InstanceUpdateStatus.SUCCEEDED,
-              InstanceUpdateStatus.FAILED)
-          .addState(InstanceUpdateStatus.SUCCEEDED)
-          .addState(InstanceUpdateStatus.FAILED)
+      stateMachine = StateMachine.<SideEffect.InstanceUpdateStatus>builder(name)
+          .initialState(IDLE)
+          .addState(IDLE, WORKING)
+          .addState(WORKING, SUCCEEDED, FAILED)
+          .addState(SUCCEEDED)
+          .addState(FAILED)
           .throwOnBadTransition(true)
           .logTransitions()
           .build();
     }
 
-    InstanceUpdateStatus getState() {
+    SideEffect.InstanceUpdateStatus getState() {
       return stateMachine.getState();
     }
 
-    Result evaluate(T actualState) {
-      if (stateMachine.getState() == InstanceUpdateStatus.IDLE) {
-        stateMachine.transition(InstanceUpdateStatus.WORKING);
+    SideEffect evaluate(T actualState) {
+      ImmutableSet.Builder<SideEffect.InstanceUpdateStatus> statusChanges = ImmutableSet.builder();
+
+      if (stateMachine.getState() == IDLE) {
+        stateMachine.transition(WORKING);
+        statusChanges.add(WORKING);
       }
 
       Result result = evaluator.evaluate(actualState);
       if (result == Result.SUCCEEDED) {
-        stateMachine.transition(InstanceUpdateStatus.SUCCEEDED);
+        stateMachine.transition(SUCCEEDED);
+        statusChanges.add(SUCCEEDED);
       } else if (result == Result.FAILED) {
-        stateMachine.transition(InstanceUpdateStatus.FAILED);
+        stateMachine.transition(FAILED);
+        statusChanges.add(FAILED);
       }
-      return result;
-    }
-  }
 
-  @VisibleForTesting
-  enum InstanceUpdateStatus {
-    IDLE,
-    WORKING,
-    SUCCEEDED,
-    FAILED
+      return new SideEffect(result.getAction(), statusChanges.build());
+    }
   }
 
   /**
@@ -303,19 +278,19 @@ class OneWayJobUpdater<K, T> {
    */
   static class EvaluationResult<K> {
     private final OneWayStatus status;
-    private final Map<K, InstanceAction> instanceActions;
+    private final Map<K, SideEffect> sideEffects;
 
-    EvaluationResult(OneWayStatus jobStatus, Map<K, InstanceAction> instanceActions)
{
+    EvaluationResult(OneWayStatus jobStatus, Map<K, SideEffect> sideEffects) {
       this.status = requireNonNull(jobStatus);
-      this.instanceActions = requireNonNull(instanceActions);
+      this.sideEffects = requireNonNull(sideEffects);
     }
 
     public OneWayStatus getStatus() {
       return status;
     }
 
-    public Map<K, InstanceAction> getInstanceActions() {
-      return instanceActions;
+    public Map<K, SideEffect> getSideEffects() {
+      return sideEffects;
     }
 
     @Override
@@ -326,19 +301,19 @@ class OneWayJobUpdater<K, T> {
       @SuppressWarnings("unchecked")
       EvaluationResult<K> other = (EvaluationResult<K>) obj;
       return other.getStatus().equals(this.getStatus())
-          && other.getInstanceActions().equals(this.getInstanceActions());
+          && other.getSideEffects().equals(this.getSideEffects());
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(getStatus(), getInstanceActions());
+      return Objects.hash(getStatus(), getSideEffects());
     }
 
     @Override
     public String toString() {
       return com.google.common.base.Objects.toStringHelper(this)
           .add("status", getStatus())
-          .add("instanceActions", getInstanceActions())
+          .add("sideEffects", getSideEffects())
           .toString();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9fd87234/src/main/java/org/apache/aurora/scheduler/updater/SideEffect.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/SideEffect.java b/src/main/java/org/apache/aurora/scheduler/updater/SideEffect.java
new file mode 100644
index 0000000..d9e59b6
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/updater/SideEffect.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+import java.util.Objects;
+import java.util.Set;
+
+import com.google.common.base.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Side-effect resulting from evaluating an instance during a job update.  Side-effects include
+ * actions that should be performed as well as status changes of the instance monitor.
+ */
+public class SideEffect {
+  private final Optional<InstanceAction> action;
+  private final Set<InstanceUpdateStatus> statusChanges;
+
+  /**
+   * Creates a new side-effect.
+   *
+   * @param action Action to be taken on the instance, if necessary.
+   * @param statusChanges Any status changes to the instance monitor.
+   */
+  public SideEffect(
+      Optional<InstanceAction> action,
+      Set<InstanceUpdateStatus> statusChanges) {
+
+    this.action = requireNonNull(action);
+    this.statusChanges = requireNonNull(statusChanges);
+  }
+
+  public Optional<InstanceAction> getAction() {
+    return action;
+  }
+
+  public Set<InstanceUpdateStatus> getStatusChanges() {
+    return statusChanges;
+  }
+
+  /**
+   * Tests whether any of multiple side-effects contain {@link #getAction() actions} to be
+   * performed.
+   *
+   * @param sideEffects Side-effects to inspect for actions.
+   * @return {@code true} if at least one of the side-effects contains an action to perform,
+   *         otherwise {@code false}.
+   */
+  public static boolean hasActions(Iterable<SideEffect> sideEffects) {
+    for (SideEffect sideEffect : sideEffects) {
+      if (sideEffect.getAction().isPresent()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof SideEffect)) {
+      return false;
+    }
+
+    SideEffect other = (SideEffect) o;
+    return Objects.equals(other.getAction(), getAction())
+        && Objects.equals(other.getStatusChanges(), getStatusChanges());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(getAction(), getStatusChanges());
+  }
+
+  @Override
+  public String toString() {
+    return com.google.common.base.Objects.toStringHelper(this)
+        .add("action", getAction())
+        .add("statusChanges", getStatusChanges())
+        .toString();
+  }
+
+  /**
+   * The status of an instance being updated as part of a one-way job update.
+   */
+  public enum InstanceUpdateStatus {
+    /**
+     * The instance has not yet been modified by the update.
+     */
+    IDLE,
+
+    /**
+     * The instance is being modified to reach the target state of the one-way update.
+     */
+    WORKING,
+
+    /**
+     * The instance updated successfully and is no longer being monitored.
+     */
+    SUCCEEDED,
+
+    /**
+     * The instance failed to update and is no longer being monitored.
+     */
+    FAILED
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9fd87234/src/main/java/org/apache/aurora/scheduler/updater/StateEvaluator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/StateEvaluator.java b/src/main/java/org/apache/aurora/scheduler/updater/StateEvaluator.java
index 453daeb..66b0e4b 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/StateEvaluator.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/StateEvaluator.java
@@ -13,6 +13,10 @@
  */
 package org.apache.aurora.scheduler.updater;
 
+import java.util.Objects;
+
+import com.google.common.base.Optional;
+
 /**
  * Determines actions that must be taken to change the configuration of a running task.
  * <p>
@@ -44,11 +48,21 @@ interface StateEvaluator<T> {
   Result evaluate(T actualState);
 
   enum Result {
-    EVALUATE_ON_STATE_CHANGE,
-    REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE,
-    KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE,
-    EVALUATE_AFTER_MIN_RUNNING_MS,
-    SUCCEEDED,
-    FAILED
+    EVALUATE_ON_STATE_CHANGE(Optional.of(InstanceAction.AWAIT_STATE_CHANGE)),
+    REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE(Optional.of(InstanceAction.ADD_TASK)),
+    KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE(Optional.of(InstanceAction.KILL_TASK)),
+    EVALUATE_AFTER_MIN_RUNNING_MS(Optional.of(InstanceAction.WATCH_TASK)),
+    SUCCEEDED(Optional.<InstanceAction>absent()),
+    FAILED(Optional.<InstanceAction>absent());
+
+    private final Optional<InstanceAction> action;
+
+    Result(Optional<InstanceAction> action) {
+      this.action = Objects.requireNonNull(action);
+    }
+
+    public Optional<InstanceAction> getAction() {
+      return action;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9fd87234/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java b/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java
index 82f2b6d..029c105 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java
@@ -136,12 +136,12 @@ interface UpdateFactory {
                 clock));
       }
 
-      // TODO(wfarner): Add the batch_completion flag to JobUpdateSettings and pick correct
-      // strategy.
       Ordering<Integer> updateOrder = rollingForward
           ? Ordering.<Integer>natural()
           : Ordering.<Integer>natural().reverse();
 
+      // TODO(wfarner): Add the batch_completion flag to JobUpdateSettings and pick correct
+      // strategy.
       UpdateStrategy<Integer> strategy =
           new QueueStrategy<>(updateOrder, settings.getUpdateGroupSize());
 
@@ -150,8 +150,7 @@ interface UpdateFactory {
               strategy,
               settings.getMaxFailedInstances(),
               evaluators.build()),
-          rollingForward ? JobUpdateStatus.ROLLED_FORWARD : JobUpdateStatus.ROLLED_BACK,
-          rollingForward ? JobUpdateStatus.ROLLING_BACK : JobUpdateStatus.FAILED);
+          rollingForward);
     }
 
     private static Range<Integer> toRange(IRange range) {
@@ -197,29 +196,23 @@ interface UpdateFactory {
 
   class Update {
     private final OneWayJobUpdater<Integer, Optional<IScheduledTask>> updater;
-    private final JobUpdateStatus successStatus;
-    private final JobUpdateStatus failureStatus;
-
-    public Update(
-        OneWayJobUpdater<Integer, Optional<IScheduledTask>> updater,
-        JobUpdateStatus successStatus,
-        JobUpdateStatus failureStatus) {
+    private final boolean rollingForward;
 
+    Update(OneWayJobUpdater<Integer, Optional<IScheduledTask>> updater, boolean
rollingForward) {
       this.updater = requireNonNull(updater);
-      this.successStatus = requireNonNull(successStatus);
-      this.failureStatus = requireNonNull(failureStatus);
+      this.rollingForward = rollingForward;
     }
 
-    public OneWayJobUpdater<Integer, Optional<IScheduledTask>> getUpdater() {
+    OneWayJobUpdater<Integer, Optional<IScheduledTask>> getUpdater() {
       return updater;
     }
 
-    public JobUpdateStatus getSuccessStatus() {
-      return successStatus;
+    JobUpdateStatus getSuccessStatus() {
+      return rollingForward ? JobUpdateStatus.ROLLED_FORWARD : JobUpdateStatus.ROLLED_BACK;
     }
 
-    public JobUpdateStatus getFailureStatus() {
-      return failureStatus;
+    JobUpdateStatus getFailureStatus() {
+      return rollingForward ? JobUpdateStatus.ROLLING_BACK : JobUpdateStatus.FAILED;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9fd87234/src/test/java/org/apache/aurora/scheduler/updater/EnumsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/EnumsTest.java b/src/test/java/org/apache/aurora/scheduler/updater/EnumsTest.java
index defdf6e..a0e64f7 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/EnumsTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/EnumsTest.java
@@ -16,8 +16,8 @@ package org.apache.aurora.scheduler.updater;
 import org.junit.Test;
 
 import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.MonitorAction;
-import static org.apache.aurora.scheduler.updater.OneWayJobUpdater.InstanceUpdateStatus;
 import static org.apache.aurora.scheduler.updater.OneWayJobUpdater.OneWayStatus;
+import static org.apache.aurora.scheduler.updater.SideEffect.InstanceUpdateStatus;
 import static org.junit.Assert.assertEquals;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9fd87234/src/test/java/org/apache/aurora/scheduler/updater/OneWayJobUpdaterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/OneWayJobUpdaterTest.java b/src/test/java/org/apache/aurora/scheduler/updater/OneWayJobUpdaterTest.java
index 6169471..ad4040c 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/OneWayJobUpdaterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/OneWayJobUpdaterTest.java
@@ -16,6 +16,7 @@ package org.apache.aurora.scheduler.updater;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.twitter.common.testing.easymock.EasyMockTest;
@@ -24,8 +25,13 @@ import org.apache.aurora.scheduler.updater.strategy.UpdateStrategy;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.apache.aurora.scheduler.updater.InstanceAction.ADD_TASK;
+import static org.apache.aurora.scheduler.updater.InstanceAction.AWAIT_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.InstanceAction.KILL_TASK;
+import static org.apache.aurora.scheduler.updater.InstanceAction.WATCH_TASK;
 import static org.apache.aurora.scheduler.updater.OneWayJobUpdater.EvaluationResult;
 import static org.apache.aurora.scheduler.updater.OneWayJobUpdater.OneWayStatus;
+import static org.apache.aurora.scheduler.updater.SideEffect.InstanceUpdateStatus;
 import static org.apache.aurora.scheduler.updater.StateEvaluator.Result;
 import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_AFTER_MIN_RUNNING_MS;
 import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_ON_STATE_CHANGE;
@@ -40,7 +46,6 @@ import static org.junit.Assert.fail;
 
 public class OneWayJobUpdaterTest extends EasyMockTest {
   private static final Set<Integer> EMPTY = ImmutableSet.of();
-  private static final Map<Integer, InstanceAction> NO_ACTIONS = ImmutableMap.of();
 
   private UpdateStrategy<Integer> strategy;
   private StateEvaluator<String> instance0;
@@ -67,9 +72,9 @@ public class OneWayJobUpdaterTest extends EasyMockTest {
     stateProvider = createMock(new Clazz<InstanceStateProvider<Integer, String>>()
{ });
   }
 
-  private void evaluate(OneWayStatus expectedStatus, Map<Integer, InstanceAction> expectedActions)
{
+  private void evaluate(OneWayStatus expectedStatus, Map<Integer, SideEffect> expectedSideEffects)
{
     assertEquals(
-        new EvaluationResult<>(expectedStatus, expectedActions),
+        new EvaluationResult<>(expectedStatus, expectedSideEffects),
         jobUpdater.evaluate(ImmutableMap.<Integer, String>of(), stateProvider));
   }
 
@@ -77,10 +82,10 @@ public class OneWayJobUpdaterTest extends EasyMockTest {
       int instanceId,
       String state,
       OneWayStatus expectedStatus,
-      Map<Integer, InstanceAction> expectedActions) {
+      Map<Integer, SideEffect> expectedSideEffects) {
 
     assertEquals(
-        new EvaluationResult<>(expectedStatus, expectedActions),
+        new EvaluationResult<>(expectedStatus, expectedSideEffects),
         jobUpdater.evaluate(ImmutableMap.of(instanceId, state), stateProvider));
   }
 
@@ -102,6 +107,18 @@ public class OneWayJobUpdaterTest extends EasyMockTest {
     expect(instanceMock.evaluate(state)).andReturn(result);
   }
 
+  private static SideEffect sideEffect(InstanceAction action, InstanceUpdateStatus... statuses)
{
+    return new SideEffect(
+        Optional.of(action),
+        ImmutableSet.<InstanceUpdateStatus>builder().add(statuses).build());
+  }
+
+  private static SideEffect sideEffect(InstanceUpdateStatus... statuses) {
+    return new SideEffect(
+        Optional.<InstanceAction>absent(),
+        ImmutableSet.<InstanceUpdateStatus>builder().add(statuses).build());
+  }
+
   @Test
   public void testSuccessfulUpdate() {
     expect(strategy.getNextGroup(ImmutableSet.of(0, 1, 2, 3), EMPTY))
@@ -139,28 +156,31 @@ public class OneWayJobUpdaterTest extends EasyMockTest {
     evaluate(
         OneWayStatus.WORKING,
         ImmutableMap.of(
-            0, InstanceAction.KILL_TASK,
-            2, InstanceAction.ADD_TASK));
+            0, sideEffect(KILL_TASK, InstanceUpdateStatus.WORKING),
+            2, sideEffect(ADD_TASK, InstanceUpdateStatus.WORKING)));
     evaluate(
         0,
         s0,
         OneWayStatus.WORKING,
-        ImmutableMap.of(0, InstanceAction.AWAIT_STATE_CHANGE));
+        ImmutableMap.of(0, sideEffect(AWAIT_STATE_CHANGE)));
     evaluate(
         0,
         s0,
         OneWayStatus.WORKING,
-        NO_ACTIONS);
+        ImmutableMap.of(0, sideEffect(InstanceUpdateStatus.SUCCEEDED)));
     evaluate(
         2,
         s2,
         OneWayStatus.WORKING,
-        ImmutableMap.of(3, InstanceAction.WATCH_TASK));
+        ImmutableMap.of(
+            1, sideEffect(InstanceUpdateStatus.WORKING, InstanceUpdateStatus.SUCCEEDED),
+            2, sideEffect(InstanceUpdateStatus.SUCCEEDED),
+            3, sideEffect(WATCH_TASK, InstanceUpdateStatus.WORKING)));
     evaluate(
         3,
         s3,
         OneWayStatus.SUCCEEDED,
-        NO_ACTIONS);
+        ImmutableMap.of(3, sideEffect(InstanceUpdateStatus.SUCCEEDED)));
   }
 
   @Test
@@ -187,7 +207,8 @@ public class OneWayJobUpdaterTest extends EasyMockTest {
     evaluate(
         OneWayStatus.FAILED,
         ImmutableMap.of(
-            1, InstanceAction.KILL_TASK));
+            0, sideEffect(InstanceUpdateStatus.WORKING, InstanceUpdateStatus.FAILED),
+            1, sideEffect(KILL_TASK, InstanceUpdateStatus.WORKING)));
 
     // The updater should now reject further attempts to evaluate.
     try {
@@ -232,7 +253,9 @@ public class OneWayJobUpdaterTest extends EasyMockTest {
 
     evaluate(
         OneWayStatus.WORKING,
-        ImmutableMap.of(1, InstanceAction.AWAIT_STATE_CHANGE));
+        ImmutableMap.of(
+            0, sideEffect(InstanceUpdateStatus.WORKING, InstanceUpdateStatus.SUCCEEDED),
+            1, sideEffect(AWAIT_STATE_CHANGE, InstanceUpdateStatus.WORKING)));
 
     // Instance 0 is already considered finished, so any further notifications of its state
will
     // no-op.
@@ -240,7 +263,7 @@ public class OneWayJobUpdaterTest extends EasyMockTest {
         0,
         s0,
         OneWayStatus.WORKING,
-        NO_ACTIONS);
+        ImmutableMap.<Integer, SideEffect>of());
   }
 
   @Test
@@ -249,16 +272,16 @@ public class OneWayJobUpdaterTest extends EasyMockTest {
 
     EvaluationResult<String> a = new EvaluationResult<>(
         OneWayStatus.WORKING,
-        ImmutableMap.of("a", InstanceAction.KILL_TASK));
+        ImmutableMap.of("a", sideEffect(KILL_TASK)));
     EvaluationResult<String> a2 = new EvaluationResult<>(
         OneWayStatus.WORKING,
-        ImmutableMap.of("a", InstanceAction.KILL_TASK));
+        ImmutableMap.of("a", sideEffect(KILL_TASK)));
     EvaluationResult<String> b = new EvaluationResult<>(
         OneWayStatus.WORKING,
-        ImmutableMap.of("b", InstanceAction.KILL_TASK));
+        ImmutableMap.of("b", sideEffect(KILL_TASK)));
     EvaluationResult<String> c = new EvaluationResult<>(
         OneWayStatus.FAILED,
-        ImmutableMap.of("b", InstanceAction.KILL_TASK));
+        ImmutableMap.of("b", sideEffect(KILL_TASK)));
     assertEquals(ImmutableSet.of(a, b), ImmutableSet.of(a, a2, b));
     assertEquals(a, a2);
     assertNotEquals(a, b);


Mime
View raw message