aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject git commit: Refactoring SchedulerCore (killTasks)
Date Tue, 22 Jul 2014 22:41:15 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master d97066e5c -> 46c9011c7


Refactoring SchedulerCore (killTasks)

Bugs closed: AURORA-94

Reviewed at https://reviews.apache.org/r/23254/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/46c9011c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/46c9011c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/46c9011c

Branch: refs/heads/master
Commit: 46c9011c756ac3c17df53e473a06b31e2875736e
Parents: d97066e
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Tue Jul 22 15:40:38 2014 -0700
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Tue Jul 22 15:40:38 2014 -0700

----------------------------------------------------------------------
 .../aurora/scheduler/http/ServletModule.java    |   9 -
 .../aurora/scheduler/state/SchedulerCore.java   |   9 -
 .../scheduler/state/SchedulerCoreImpl.java      |  39 --
 .../thrift/SchedulerThriftInterface.java        | 171 ++++++---
 .../apache/aurora/scheduler/thrift/Util.java    |  30 +-
 .../thrift/aop/LoggingInterceptor.java          |   2 +-
 .../configuration/ConfigurationManagerTest.java |  13 +-
 .../state/BaseSchedulerCoreImplTest.java        | 375 -------------------
 .../scheduler/state/StateManagerImplTest.java   |  79 +++-
 .../scheduler/storage/StorageBackfillTest.java  | 185 +++++++++
 .../thrift/SchedulerThriftInterfaceTest.java    | 124 ++++--
 11 files changed, 492 insertions(+), 544 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/46c9011c/src/main/java/org/apache/aurora/scheduler/http/ServletModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/ServletModule.java b/src/main/java/org/apache/aurora/scheduler/http/ServletModule.java
index 37ffc47..729e0ab 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/ServletModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/ServletModule.java
@@ -39,10 +39,6 @@ import com.twitter.common.net.pool.DynamicHostSet;
 import com.twitter.common.net.pool.DynamicHostSet.MonitorException;
 import com.twitter.thrift.ServiceInstance;
 
-import org.apache.aurora.scheduler.cron.CronJobManager;
-import org.apache.aurora.scheduler.quota.QuotaManager;
-import org.apache.aurora.scheduler.state.SchedulerCore;
-import org.apache.aurora.scheduler.storage.entities.IServerInfo;
 import org.mortbay.servlet.GzipFilter;
 
 import static com.sun.jersey.api.core.ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS;
@@ -69,11 +65,6 @@ public class ServletModule extends AbstractModule {
 
   @Override
   protected void configure() {
-    requireBinding(SchedulerCore.class);
-    requireBinding(CronJobManager.class);
-    requireBinding(IServerInfo.class);
-    requireBinding(QuotaManager.class);
-
     // Register /api end point
     Registration.registerServlet(binder(), "/api", SchedulerAPIServlet.class, true);
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/46c9011c/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
index 628464d..137bd6c 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
@@ -19,7 +19,6 @@ import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
 
 import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.ScheduleException;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
 import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
@@ -70,14 +69,6 @@ public interface SchedulerCore {
   void setTaskStatus(String taskId, ScheduleStatus status, Optional<String> message);
 
   /**
-   * Kills a specific set of tasks.
-   *
-   * @param query Builder for a query to identify tasks
-   * @param user Name of the user performing the kill.
-   */
-  void killTasks(Query.Builder query, String user);
-
-  /**
    * Initiates a restart of shards within an active job.
    *
    * @param jobKey Key of job to be restarted.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/46c9011c/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
index b6d06f3..adc2dbc 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
@@ -23,7 +23,6 @@ import com.google.common.base.Functions;
 import com.google.common.base.Optional;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.twitter.common.args.Arg;
@@ -52,9 +51,7 @@ import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
 import static java.util.Objects.requireNonNull;
 
-import static org.apache.aurora.gen.ScheduleStatus.KILLING;
 import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
-import static org.apache.aurora.scheduler.base.Tasks.ACTIVE_STATES;
 import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.INSUFFICIENT_QUOTA;
 
 /**
@@ -215,42 +212,6 @@ class SchedulerCoreImpl implements SchedulerCore {
   }
 
   @Override
-  public synchronized void killTasks(Query.Builder query, final String user) {
-
-    requireNonNull(query);
-    LOG.info("Killing tasks matching " + query);
-
-    if (Query.isSingleJobScoped(query)) {
-      // If this looks like a query for all tasks in a job, instruct the cron scheduler to delete
-      // it.
-      // TODO(maxim): Should be trivial to support killing multiple jobs instead.
-      // TODO(mchucarroll): deprecate cron as a part of create/kill job.  (AURORA-454)
-      IJobKey jobKey = Iterables.getOnlyElement(JobKeys.from(query).get());
-      LOG.warning("Deprecated behavior: descheduling job " + jobKey
-          + " with cron via killTasks. (See AURORA-454)");
-      cronJobManager.deleteJob(jobKey);
-    }
-
-    // Unless statuses were specifically supplied, only attempt to kill active tasks.
-    final Query.Builder taskQuery = query.get().isSetStatuses()
-        ? query.byStatus(ACTIVE_STATES)
-        : query;
-
-    storage.write(new MutateWork.NoResult.Quiet() {
-      @Override
-      public void execute(MutableStoreProvider storeProvider) {
-        for (String taskId : Tasks.ids(storeProvider.getTaskStore().fetchTasks(taskQuery))) {
-          stateManager.changeState(
-              taskId,
-              Optional.<ScheduleStatus>absent(),
-              KILLING,
-              Optional.of("Killed by " + user));
-        }
-      }
-    });
-  }
-
-  @Override
   public void restartShards(
       IJobKey jobKey,
       final Set<Integer> shards,

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/46c9011c/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index 95ee7b7..7f94e04 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -113,6 +113,7 @@ import org.apache.aurora.scheduler.state.LockManager;
 import org.apache.aurora.scheduler.state.LockManager.LockException;
 import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.SchedulerCore;
+import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.JobStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
@@ -146,6 +147,7 @@ import static org.apache.aurora.gen.ResponseCode.LOCK_ERROR;
 import static org.apache.aurora.gen.ResponseCode.OK;
 import static org.apache.aurora.gen.ResponseCode.WARNING;
 import static org.apache.aurora.gen.apiConstants.CURRENT_API_VERSION;
+import static org.apache.aurora.scheduler.base.Tasks.ACTIVE_STATES;
 import static org.apache.aurora.scheduler.thrift.Util.addMessage;
 import static org.apache.aurora.scheduler.thrift.Util.emptyResponse;
 
@@ -179,6 +181,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
   private final CronPredictor cronPredictor;
   private final QuotaManager quotaManager;
   private final NearestFit nearestFit;
+  private final StateManager stateManager;
 
   @Inject
   SchedulerThriftInterface(
@@ -192,7 +195,8 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       CronPredictor cronPredictor,
       MaintenanceController maintenance,
       QuotaManager quotaManager,
-      NearestFit nearestFit) {
+      NearestFit nearestFit,
+      StateManager stateManager) {
 
     this(storage,
         schedulerCore,
@@ -204,7 +208,8 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
         cronJobManager,
         cronPredictor,
         quotaManager,
-        nearestFit);
+        nearestFit,
+        stateManager);
   }
 
   @VisibleForTesting
@@ -219,7 +224,8 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       CronJobManager cronJobManager,
       CronPredictor cronPredictor,
       QuotaManager quotaManager,
-      NearestFit nearestFit) {
+      NearestFit nearestFit,
+      StateManager stateManager) {
 
     this.storage = requireNonNull(storage);
     this.schedulerCore = requireNonNull(schedulerCore);
@@ -232,6 +238,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     this.cronPredictor = requireNonNull(cronPredictor);
     this.quotaManager = requireNonNull(quotaManager);
     this.nearestFit = requireNonNull(nearestFit);
+    this.stateManager = requireNonNull(stateManager);
   }
 
   @Override
@@ -249,7 +256,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     try {
       sessionValidator.checkAuthenticated(session, ImmutableSet.of(job.getOwner().getRole()));
     } catch (AuthFailedException e) {
-      return addMessage(response.setResponseCode(AUTH_FAILED), e.getMessage());
+      return addMessage(response, AUTH_FAILED, e);
     }
 
     try {
@@ -262,9 +269,9 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       schedulerCore.createJob(sanitized);
       response.setResponseCode(OK);
     } catch (LockException e) {
-      addMessage(response, LOCK_ERROR, e.getMessage());
+      addMessage(response, LOCK_ERROR, e);
     } catch (TaskDescriptionException | ScheduleException e) {
-      addMessage(response, INVALID_REQUEST, e.getMessage());
+      addMessage(response, INVALID_REQUEST, e);
     }
     return response;
   }
@@ -284,8 +291,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     try {
       sessionValidator.checkAuthenticated(session, ImmutableSet.of(job.getOwner().getRole()));
     } catch (AuthFailedException e) {
-      response.setResponseCode(AUTH_FAILED);
-      return addMessage(response, e.getMessage());
+      return addMessage(response, AUTH_FAILED, e);
     }
 
     try {
@@ -313,14 +319,14 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
           cronJobManager.createJob(SanitizedCronJob.from(sanitized));
         }
       } catch (CronException e) {
-        addMessage(response, INVALID_REQUEST, e.getMessage());
+        addMessage(response, INVALID_REQUEST, e);
         return response;
       }
       response.setResponseCode(OK);
     } catch (LockException e) {
-      addMessage(response, LOCK_ERROR, e.getMessage());
+      addMessage(response, LOCK_ERROR, e);
     } catch (TaskDescriptionException e) {
-      addMessage(response, INVALID_REQUEST, e.getMessage());
+      addMessage(response, INVALID_REQUEST, e);
     }
     return response;
   }
@@ -346,7 +352,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       }
       response.setResponseCode(OK);
     } catch (LockException e) {
-      addMessage(response, LOCK_ERROR, e.getMessage());
+      addMessage(response, LOCK_ERROR, e);
     }
     return response;
   }
@@ -366,7 +372,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     try {
       sessionValidator.checkAuthenticated(session, ImmutableSet.of(job.getOwner().getRole()));
     } catch (AuthFailedException e) {
-      return addMessage(response, AUTH_FAILED, e.getMessage());
+      return addMessage(response, AUTH_FAILED, e);
     }
 
     try {
@@ -377,9 +383,9 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       cronJobManager.updateJob(SanitizedCronJob.fromUnsanitized(job));
       return response.setResponseCode(OK);
     } catch (LockException e) {
-      return addMessage(response, LOCK_ERROR, e.getMessage());
+      return addMessage(response, LOCK_ERROR, e);
     } catch (CronException | TaskDescriptionException e) {
-      return addMessage(response, INVALID_REQUEST, e.getMessage());
+      return addMessage(response, INVALID_REQUEST, e);
     }
   }
 
@@ -412,7 +418,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     try {
       sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
     } catch (AuthFailedException e) {
-      return addMessage(response, AUTH_FAILED, e.getMessage());
+      return addMessage(response, AUTH_FAILED, e);
     }
 
     try {
@@ -667,7 +673,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
 
   private SessionContext validateSessionKeyForTasks(
       SessionKey session,
-      TaskQuery taskQuery,
+      Query.Builder taskQuery,
       Iterable<IScheduledTask> tasks) throws AuthFailedException {
 
     // Authenticate the session against any affected roles, always including the role for a
@@ -675,8 +681,8 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     // authenticated this way.
     ImmutableSet.Builder<String> targetRoles = ImmutableSet.<String>builder()
         .addAll(FluentIterable.from(tasks).transform(GET_ROLE));
-    if (taskQuery.isSetOwner()) {
-      targetRoles.add(taskQuery.getOwner().getRole());
+    if (taskQuery.get().isSetOwner()) {
+      targetRoles.add(taskQuery.get().getOwner().getRole());
     }
     return sessionValidator.checkAuthenticated(session, targetRoles.build());
   }
@@ -691,40 +697,79 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
   }
 
   @Override
-  public Response killTasks(final TaskQuery query, Lock mutablelock, SessionKey session) {
-    requireNonNull(query);
+  public Response killTasks(
+      final TaskQuery mutableQuery,
+      final Lock mutableLock,
+      final SessionKey session) {
+
+    requireNonNull(mutableQuery);
     requireNonNull(session);
 
-    Response response = Util.emptyResponse();
+    final Response response = emptyResponse();
 
-    if (query.getJobName() != null && StringUtils.isBlank(query.getJobName())) {
+    if (mutableQuery.getJobName() != null && StringUtils.isBlank(mutableQuery.getJobName())) {
       return addMessage(
           response,
           INVALID_REQUEST,
-          String.format("Invalid job name: '%s'", query.getJobName()));
+          String.format("Invalid job name: '%s'", mutableQuery.getJobName()));
     }
 
-    Set<IScheduledTask> tasks = Storage.Util.consistentFetchTasks(storage, Query.arbitrary(query));
+    return storage.write(new MutateWork.Quiet<Response>() {
+      @Override
+      public Response apply(MutableStoreProvider storeProvider) {
+        Query.Builder query = Query.arbitrary(mutableQuery);
 
-    Optional<SessionContext> context = isAdmin(session);
-    if (context.isPresent()) {
-      LOG.info("Granting kill query to admin user: " + query);
-    } else {
-      try {
-        context = Optional.of(validateSessionKeyForTasks(session, query, tasks));
-      } catch (AuthFailedException e) {
-        return addMessage(response, AUTH_FAILED, e.getMessage());
-      }
-    }
+        // Check single job scoping before adding statuses.
+        boolean isSingleJobScoped = Query.isSingleJobScoped(query);
 
-    try {
-      validateLockForTasks(Optional.fromNullable(mutablelock).transform(ILock.FROM_BUILDER), tasks);
-      schedulerCore.killTasks(Query.arbitrary(query), context.get().getIdentity());
-    } catch (LockException e) {
-      return addMessage(response, LOCK_ERROR, e.getMessage());
-    }
+        // Unless statuses were specifically supplied, only attempt to kill active tasks.
+        query = query.get().isSetStatuses() ? query : query.byStatus(ACTIVE_STATES);
+
+        final Set<IScheduledTask> tasks = storeProvider.getTaskStore().fetchTasks(query);
 
-    return response.setResponseCode(OK);
+        Optional<SessionContext> context = isAdmin(session);
+        if (context.isPresent()) {
+          LOG.info("Granting kill query to admin user: " + query);
+        } else {
+          try {
+            context = Optional.of(validateSessionKeyForTasks(session, query, tasks));
+          } catch (AuthFailedException e) {
+            return addMessage(response, AUTH_FAILED, e);
+          }
+        }
+
+        try {
+          validateLockForTasks(
+              Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER),
+              tasks);
+        } catch (LockException e) {
+          return addMessage(response, LOCK_ERROR, e);
+        }
+
+        LOG.info("Killing tasks matching " + query);
+
+        boolean tasksKilled = false;
+        if (isSingleJobScoped) {
+          // If this looks like a query for all tasks in a job, instruct the cron
+          // scheduler to delete it.
+          // TODO(mchucarroll): deprecate cron as a part of create/kill job.  (AURORA-454)
+          IJobKey jobKey = Iterables.getOnlyElement(JobKeys.from(query).get());
+          LOG.warning("Deprecated behavior: descheduling job " + jobKey
+              + " with cron via killTasks. (See AURORA-454)");
+          tasksKilled = cronJobManager.deleteJob(jobKey);
+        }
+
+        for (String taskId : Tasks.ids(tasks)) {
+          tasksKilled |= stateManager.changeState(
+              taskId,
+              Optional.<ScheduleStatus>absent(),
+              ScheduleStatus.KILLING,
+              killedByMessage(context.get().getIdentity()));
+        }
+
+        return tasksKilled ? okEmptyResponse() : addMessage(response, OK, NO_TASKS_TO_KILL_MESSAGE);
+      }
+    });
   }
 
   @Override
@@ -743,7 +788,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     try {
       context = sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
     } catch (AuthFailedException e) {
-      return addMessage(response, AUTH_FAILED, e.getMessage());
+      return addMessage(response, AUTH_FAILED, e);
     }
 
     try {
@@ -753,9 +798,9 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       schedulerCore.restartShards(jobKey, shardIds, context.getIdentity());
       response.setResponseCode(OK);
     } catch (LockException e) {
-      addMessage(response, LOCK_ERROR, e.getMessage());
+      addMessage(response, LOCK_ERROR, e);
     } catch (ScheduleException e) {
-      addMessage(response, INVALID_REQUEST, e.getMessage());
+      addMessage(response, INVALID_REQUEST, e);
     }
 
     return response;
@@ -789,7 +834,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       quotaManager.saveQuota(ownerRole, IResourceAggregate.build(resourceAggregate));
       return response.setResponseCode(OK);
     } catch (QuotaException e) {
-      return addMessage(response, INVALID_REQUEST, e.getMessage());
+      return addMessage(response, INVALID_REQUEST, e);
     }
   }
 
@@ -835,7 +880,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       // TODO(Sathya): Remove this after AOP-style session validation passes in a SessionContext.
       context = sessionValidator.checkAuthorized(session, Capability.ROOT, AuditCheck.REQUIRED);
     } catch (AuthFailedException e) {
-      addMessage(response, AUTH_FAILED, e.getMessage());
+      addMessage(response, AUTH_FAILED, e);
       return response;
     }
 
@@ -861,7 +906,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     try {
       recovery.stage(backupId);
     } catch (RecoveryException e) {
-      addMessage(response, ERROR, e.getMessage());
+      addMessage(response, ERROR, e);
       LOG.log(Level.WARNING, "Failed to stage recovery: " + e, e);
     }
 
@@ -876,7 +921,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
           .setResult(Result.queryRecoveryResult(new QueryRecoveryResult()
               .setTasks(IScheduledTask.toBuildersSet(recovery.query(Query.arbitrary(query))))));
     } catch (RecoveryException e) {
-      addMessage(response, ERROR, e.getMessage());
+      addMessage(response, ERROR, e);
       LOG.log(Level.WARNING, "Failed to query recovery: " + e, e);
     }
 
@@ -889,7 +934,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     try {
       recovery.deleteTasks(Query.arbitrary(query));
     } catch (RecoveryException e) {
-      addMessage(response, ERROR, e.getMessage());
+      addMessage(response, ERROR, e);
       LOG.log(Level.WARNING, "Failed to delete recovery tasks: " + e, e);
     }
 
@@ -902,7 +947,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     try {
       recovery.commit();
     } catch (RecoveryException e) {
-      addMessage(response, ERROR, e.getMessage());
+      addMessage(response, ERROR, e);
     }
 
     return response;
@@ -922,7 +967,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       return response.setResponseCode(OK);
     } catch (Storage.StorageException e) {
       LOG.log(Level.WARNING, "Requested snapshot failed.", e);
-      return addMessage(response, ERROR, e.getMessage());
+      return addMessage(response, ERROR, e);
     }
   }
 
@@ -1107,11 +1152,11 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       schedulerCore.addInstances(jobKey, ImmutableSet.copyOf(config.getInstanceIds()), task);
       return resp.setResponseCode(OK);
     } catch (AuthFailedException e) {
-      return addMessage(resp, AUTH_FAILED, e.getMessage());
+      return addMessage(resp, AUTH_FAILED, e);
     } catch (LockException e) {
-      return addMessage(resp, LOCK_ERROR, e.getMessage());
+      return addMessage(resp, LOCK_ERROR, e);
     } catch (TaskDescriptionException | ScheduleException e) {
-      return addMessage(resp, INVALID_REQUEST, e.getMessage());
+      return addMessage(resp, INVALID_REQUEST, e);
     }
   }
 
@@ -1143,9 +1188,9 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
 
       return response.setResponseCode(OK);
     } catch (AuthFailedException e) {
-      return addMessage(response, AUTH_FAILED, e.getMessage());
+      return addMessage(response, AUTH_FAILED, e);
     } catch (LockException e) {
-      return addMessage(response, LOCK_ERROR, e.getMessage());
+      return addMessage(response, LOCK_ERROR, e);
     }
   }
 
@@ -1169,9 +1214,9 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       lockManager.releaseLock(lock);
       return response.setResponseCode(OK);
     } catch (AuthFailedException e) {
-      return addMessage(response, AUTH_FAILED, e.getMessage());
+      return addMessage(response, AUTH_FAILED, e);
     } catch (LockException e) {
-      return addMessage(response, LOCK_ERROR, e.getMessage());
+      return addMessage(response, LOCK_ERROR, e);
     }
   }
 
@@ -1186,6 +1231,14 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     return Optional.of("Transition forced by " + user);
   }
 
+  @VisibleForTesting
+  static Optional<String> killedByMessage(String user) {
+    return Optional.of("Killed by " + user);
+  }
+
+  @VisibleForTesting
+  static final String NO_TASKS_TO_KILL_MESSAGE = "No tasks to kill.";
+
   private static Response okEmptyResponse()  {
     return emptyResponse().setResponseCode(OK);
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/46c9011c/src/main/java/org/apache/aurora/scheduler/thrift/Util.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/Util.java b/src/main/java/org/apache/aurora/scheduler/thrift/Util.java
index 4ebb608..18e2bdf 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/Util.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/Util.java
@@ -15,6 +15,7 @@ package org.apache.aurora.scheduler.thrift;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
+import com.twitter.common.base.MorePreconditions;
 
 import org.apache.aurora.gen.Response;
 import org.apache.aurora.gen.ResponseCode;
@@ -47,12 +48,7 @@ public final class Util {
    * @return {@code response} with {@code message} included.
    */
   public static Response addMessage(Response response, String message) {
-    String existingMessage = response.getMessageDEPRECATED();
-    String prefix = Strings.isNullOrEmpty(existingMessage) ? "" : existingMessage + ", ";
-    response.setMessageDEPRECATED(prefix + message);
-
-    response.addToDetails(new ResponseDetail(message));
-    return response;
+    return appendMessage(response, MorePreconditions.checkNotBlank(message));
   }
 
   /**
@@ -67,4 +63,26 @@ public final class Util {
   public static Response addMessage(Response response, ResponseCode code, String message) {
     return addMessage(response.setResponseCode(code), message);
   }
+
+  /**
+   * Identical to {@link #addMessage(Response, String)} that also applies a response code and
+   * extracts a message from the provided {@link Throwable}.
+   *
+   * @param response Response to augment.
+   * @param code Response code to include.
+   * @param throwable {@link Throwable} to extract message from.
+   * @return {@link #addMessage(Response, String)}
+   */
+  public static Response addMessage(Response response, ResponseCode code, Throwable throwable) {
+    return appendMessage(response.setResponseCode(code), throwable.getMessage());
+  }
+
+  private static Response appendMessage(Response response, String message) {
+    String existingMessage = response.getMessageDEPRECATED();
+    String prefix = Strings.isNullOrEmpty(existingMessage) ? "" : existingMessage + ", ";
+    response.setMessageDEPRECATED(prefix + message);
+
+    response.addToDetails(new ResponseDetail(message));
+    return response;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/46c9011c/src/main/java/org/apache/aurora/scheduler/thrift/aop/LoggingInterceptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/aop/LoggingInterceptor.java b/src/main/java/org/apache/aurora/scheduler/thrift/aop/LoggingInterceptor.java
index 8692051..a21ab90 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/aop/LoggingInterceptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/aop/LoggingInterceptor.java
@@ -87,7 +87,7 @@ class LoggingInterceptor implements MethodInterceptor {
       return invocation.proceed();
     } catch (RuntimeException e) {
       LOG.log(Level.WARNING, "Uncaught exception while handling " + message, e);
-      return Util.addMessage(Util.emptyResponse(), ResponseCode.ERROR, e.getMessage());
+      return Util.addMessage(Util.emptyResponse(), ResponseCode.ERROR, e);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/46c9011c/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java b/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java
index 2298971..ee95875 100644
--- a/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java
@@ -31,6 +31,7 @@ import org.junit.Test;
 import static org.apache.aurora.gen.apiConstants.DEFAULT_ENVIRONMENT;
 import static org.apache.aurora.gen.test.testConstants.INVALID_IDENTIFIERS;
 import static org.apache.aurora.gen.test.testConstants.VALID_IDENTIFIERS;
+import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE;
 import static org.apache.aurora.scheduler.configuration.ConfigurationManager.isGoodIdentifier;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -71,10 +72,14 @@ public class ConfigurationManagerTest {
                       new Constraint()
                           .setName("host")
                           .setConstraint(TaskConstraint.limit(new LimitConstraint()
-                              .setLimit(1))))))
-      .setOwner(new Identity()
-          .setRole("owner-role")
-          .setUser("owner-user"));
+                              .setLimit(1))),
+                      new Constraint()
+                          .setName(DEDICATED_ATTRIBUTE)
+                          .setConstraint(TaskConstraint.value(new ValueConstraint(
+                              false, ImmutableSet.of("foo"))))))
+              .setOwner(new Identity()
+                  .setRole("owner-role")
+                  .setUser("owner-user")));
 
   @Test
   public void testIsGoodIdentifier() {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/46c9011c/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
index d2bd65e..5ec7cf3 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
@@ -15,15 +15,10 @@ package org.apache.aurora.scheduler.state;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-import javax.annotation.Nullable;
-
-import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Strings;
 import com.google.common.base.Throwables;
@@ -31,7 +26,6 @@ import com.google.common.collect.ContiguousSet;
 import com.google.common.collect.DiscreteDomain;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Range;
@@ -40,18 +34,14 @@ import com.twitter.common.collections.Pair;
 import com.twitter.common.testing.easymock.EasyMockTest;
 import com.twitter.common.util.testing.FakeClock;
 
-import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.Constraint;
 import org.apache.aurora.gen.ExecutorConfig;
 import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.JobConfiguration;
 import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskConstraint;
-import org.apache.aurora.gen.TaskEvent;
-import org.apache.aurora.gen.TaskQuery;
 import org.apache.aurora.gen.ValueConstraint;
 import org.apache.aurora.scheduler.Driver;
 import org.apache.aurora.scheduler.TaskIdGenerator;
@@ -60,13 +50,10 @@ import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.ScheduleException;
 import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
 import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
 import org.apache.aurora.scheduler.cron.CronException;
 import org.apache.aurora.scheduler.cron.CronJobManager;
-import org.apache.aurora.scheduler.cron.CrontabEntry;
-import org.apache.aurora.scheduler.cron.SanitizedCronJob;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.quota.QuotaCheckResult;
@@ -75,15 +62,12 @@ import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.StorageBackfill;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.storage.entities.ITaskEvent;
-import org.apache.mesos.Protos.SlaveID;
 import org.easymock.EasyMock;
-import org.easymock.IArgumentMatcher;
 import org.easymock.IExpectationSetters;
 import org.junit.Before;
 import org.junit.Test;
@@ -92,7 +76,6 @@ import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
 import static org.apache.aurora.gen.ScheduleStatus.FAILED;
 import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
 import static org.apache.aurora.gen.ScheduleStatus.KILLED;
-import static org.apache.aurora.gen.ScheduleStatus.KILLING;
 import static org.apache.aurora.gen.ScheduleStatus.LOST;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
@@ -107,7 +90,6 @@ import static org.easymock.EasyMock.anyInt;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.reportMatcher;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -125,17 +107,10 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   private static final String ENV_A = "Test_Env_A";
   private static final String JOB_A = "Test_Job_A";
   private static final IJobKey KEY_A = JobKeys.from(ROLE_A, ENV_A, JOB_A);
-  private static final int ONE_GB = 1024;
-
-  private static final SlaveID SLAVE_ID = SlaveID.newBuilder().setValue("SlaveId").build();
-  private static final String SLAVE_HOST_1 = "SlaveHost1";
 
   private static final QuotaCheckResult ENOUGH_QUOTA = new QuotaCheckResult(SUFFICIENT_QUOTA);
   private static final QuotaCheckResult NOT_ENOUGH_QUOTA = new QuotaCheckResult(INSUFFICIENT_QUOTA);
 
-  public static final CrontabEntry CRONTAB_ENTRY = CrontabEntry.parse("1 1 1 1 *");
-  public static final String RAW_CRONTAB_ENTRY = CRONTAB_ENTRY.toString();
-
   private Driver driver;
   private StateManagerImpl stateManager;
   private Storage storage;
@@ -290,102 +265,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testLoadTasksFromStorage() throws Exception {
-    final String storedTaskId = "task_on_disk";
-
-    control.replay();
-
-    storage = createStorage();
-    String thermosConfig = "thermosConfig";
-
-    final TaskConfig storedTask = new TaskConfig()
-        .setOwner(OWNER_A)
-        .setJobName(JOB_A)
-        .setEnvironment(ENV_A)
-        .setNumCpus(1.0)
-        .setRamMb(ONE_GB)
-        .setDiskMb(500)
-        .setExecutorConfig(new ExecutorConfig("AuroraExecutor", thermosConfig))
-        .setRequestedPorts(ImmutableSet.<String>of())
-        .setConstraints(ImmutableSet.<Constraint>of())
-        .setTaskLinks(ImmutableMap.<String, String>of());
-
-    storage.write(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider storeProvider) {
-        storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(
-            IScheduledTask.build(
-              new ScheduledTask()
-                  .setStatus(PENDING)
-                  .setTaskEvents(ImmutableList.of(new TaskEvent(100, ScheduleStatus.PENDING)))
-                  .setAssignedTask(
-                      new AssignedTask()
-                          .setTaskId(storedTaskId)
-                          .setInstanceId(0)
-                          .setTask(storedTask)))));
-      }
-    });
-
-    buildScheduler(storage);
-
-    assignTask(storedTaskId, SLAVE_ID, SLAVE_HOST_1);
-
-    // Since task fields are backfilled with defaults, additional flags should be filled.
-    ITaskConfig expected = ITaskConfig.build(new TaskConfig(storedTask)
-        .setProduction(false)
-        .setMaxTaskFailures(1)
-        .setExecutorConfig(new ExecutorConfig("AuroraExecutor", thermosConfig))
-        .setConstraints(ImmutableSet.of(ConfigurationManager.hostLimitConstraint(1))));
-
-    assertEquals(expected, getTask(storedTaskId).getAssignedTask().getTask());
-    assertEquals(ASSIGNED, getTask(storedTaskId).getStatus());
-  }
-
-  private void assignTask(String taskId, SlaveID slaveId, String slaveHost, Set<Integer> ports) {
-    stateManager.assignTask(taskId, slaveHost, slaveId, ports);
-  }
-
-  private void assignTask(String taskId, SlaveID slaveId, String slaveHost) {
-    assignTask(taskId, slaveId, slaveHost, ImmutableSet.<Integer>of());
-  }
-
-  @Test
-  public void testShardUniquenessCorrection() throws Exception {
-    control.replay();
-
-    storage = createStorage();
-
-    final AtomicInteger taskId = new AtomicInteger();
-
-    SanitizedConfiguration job = makeJob(KEY_A, 10);
-    final Set<IScheduledTask> badTasks = ImmutableSet.copyOf(Iterables
-        .transform(job.getTaskConfigs().values(),
-            new Function<ITaskConfig, IScheduledTask>() {
-              @Override
-              public IScheduledTask apply(ITaskConfig task) {
-                return IScheduledTask.build(new ScheduledTask()
-                    .setStatus(RUNNING)
-                    .setAssignedTask(
-                        new AssignedTask()
-                            .setInstanceId(0)
-                            .setTaskId("task-" + taskId.incrementAndGet())
-                            .setTask(task.newBuilder())));
-              }
-            }));
-
-    storage.write(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider storeProvider) {
-        storeProvider.getUnsafeTaskStore().saveTasks(badTasks);
-      }
-    });
-
-    buildScheduler(storage);
-    assertEquals(1, getTasksByStatus(RUNNING).size());
-    assertEquals(9, getTasksByStatus(KILLED).size());
-  }
-
-  @Test
   public void testRejectsBadIdentifiers() throws Exception {
     control.replay();
     buildScheduler();
@@ -446,50 +325,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     scheduler.createJob(makeJob(KEY_A, 1));
   }
 
-  @Test
-  public void testKillTask() throws Exception {
-    driver.killTask(EasyMock.<String>anyObject());
-    // We only expect three kills because the first test does not move out of PENDING.
-    expectLastCall().times(3);
-
-    control.replay();
-    buildScheduler();
-
-    for (Set<ScheduleStatus> statuses : ImmutableSet.of(
-        ImmutableSet.<ScheduleStatus>of(),
-        EnumSet.of(ASSIGNED),
-        EnumSet.of(ASSIGNED, STARTING),
-        EnumSet.of(ASSIGNED, STARTING, RUNNING))) {
-
-      scheduler.createJob(makeJob(KEY_A, 1));
-      String taskId = Tasks.id(getOnlyTask(
-          Query.jobScoped(KEY_A).active()));
-
-      for (ScheduleStatus status : statuses) {
-        changeStatus(taskId, status);
-      }
-
-      scheduler.killTasks(Query.roleScoped(ROLE_A), OWNER_A.getUser());
-
-      if (!statuses.isEmpty()) {
-        // If there was no move out of the PENDING state, the task is deleted outright.
-        assertEquals(KILLING, getTask(taskId).getStatus());
-      }
-
-      // Simulate a KILLED ack from the executor.
-      changeStatus(Query.roleScoped(ROLE_A), KILLED);
-      assertTrue(
-          getTasks(Query.jobScoped(KEY_A).active()).isEmpty());
-    }
-  }
-
-  @Test
-  public void testKillNoTasksDoesNotThrow() throws Exception {
-    control.replay();
-    buildScheduler();
-    scheduler.killTasks(Query.roleScoped("role_absent"), OWNER_A.getUser());
-  }
-
   private IExpectationSetters<Long> expectTaskNotThrottled() {
     return expect(rescheduleCalculator.getFlappingPenaltyMs(EasyMock.<IScheduledTask>anyObject()))
         .andReturn(0L);
@@ -596,26 +431,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testNoTransitionFromTerminalState() throws Exception {
-    expectKillTask(1);
-    expectNoCronJob(KEY_A);
-
-    control.replay();
-    buildScheduler();
-
-    scheduler.createJob(makeJob(KEY_A, 1));
-    changeStatus(Query.roleScoped(ROLE_A), ASSIGNED, STARTING, RUNNING);
-    scheduler.killTasks(Query.roleScoped(ROLE_A), OWNER_A.getUser());
-    changeStatus(Query.roleScoped(ROLE_A), KILLED);
-
-    String taskId = Tasks.id(getOnlyTask(Query.roleScoped(ROLE_A)));
-
-    // This transition should be rejected.
-    changeStatus(Query.roleScoped(ROLE_A), LOST);
-    assertEquals(KILLED, getTask(taskId).getStatus());
-  }
-
-  @Test
   public void testFailedTaskIncrementsFailureCount() throws Exception {
     int maxFailures = 5;
     expectTaskNotThrottled().times(maxFailures - 1);
@@ -652,55 +467,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testKillPendingTask() throws Exception {
-    expectNoCronJob(KEY_A);
-
-    control.replay();
-    buildScheduler();
-
-    scheduler.createJob(makeJob(KEY_A, 1));
-    assertTaskCount(1);
-
-    Set<IScheduledTask> tasks = Storage.Util.consistentFetchTasks(storage, Query.jobScoped(KEY_A));
-    assertEquals(1, tasks.size());
-
-    String taskId = Tasks.id(Iterables.get(tasks, 0));
-
-    scheduler.killTasks(Query.taskScoped(taskId), OWNER_A.getUser());
-    assertTaskCount(0);
-  }
-
-  @Test
-  public void testKillRunningTask() throws Exception {
-    expectKillTask(1);
-
-    control.replay();
-    buildScheduler();
-
-    scheduler.createJob(makeJob(KEY_A, 1));
-    String taskId = Tasks.id(getOnlyTask(Query.roleScoped(ROLE_A)));
-    changeStatus(taskId, ASSIGNED, STARTING, RUNNING);
-    scheduler.killTasks(Query.taskScoped(taskId), OWNER_A.getUser());
-    assertEquals(KILLING, getTask(taskId).getStatus());
-    assertEquals(1, getTasks(Query.roleScoped(ROLE_A)).size());
-    changeStatus(taskId, KILLED);
-    assertEquals(KILLED, getTask(taskId).getStatus());
-  }
-
-  @Test
-  public void testKillCronTask() throws Exception {
-    expect(cronJobManager.hasJob(KEY_A)).andReturn(false);
-    cronJobManager.createJob(anyObject(SanitizedCronJob.class));
-    expect(cronJobManager.deleteJob(KEY_A)).andReturn(true);
-    control.replay();
-    buildScheduler();
-    scheduler.createJob(makeCronJob(KEY_A, 1, RAW_CRONTAB_ENTRY));
-
-    // This will fail if the cron task could not be found.
-    scheduler.killTasks(Query.jobScoped(KEY_A), OWNER_A.getUser());
-  }
-
-  @Test
   public void testLostTaskRescheduled() throws Exception {
     expectKillTask(2);
     expectTaskNotThrottled().times(2);
@@ -734,41 +500,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testKillJob() throws Exception {
-    expectNoCronJob(KEY_A);
-    expect(cronJobManager.deleteJob(KEY_A)).andReturn(false);
-
-    control.replay();
-    buildScheduler();
-
-    scheduler.createJob(makeJob(KEY_A, 10));
-    assertTaskCount(10);
-
-    scheduler.killTasks(Query.jobScoped(KEY_A), OWNER_A.getUser());
-    assertTaskCount(0);
-  }
-
-  @Test
-  public void testKillJob2() throws Exception {
-    control.replay();
-    buildScheduler();
-
-    scheduler.createJob(makeJob(KEY_A, 5));
-    assertTaskCount(5);
-
-    scheduler.createJob(
-        makeJob(IJobKey.build(KEY_A.newBuilder().setName(KEY_A.getName() + "2")), 5));
-    assertTaskCount(10);
-
-    scheduler.killTasks(queryJob(OWNER_A, JOB_A + "2"), OWNER_A.getUser());
-    assertTaskCount(5);
-
-    for (IScheduledTask state : Storage.Util.consistentFetchTasks(storage, Query.unscoped())) {
-      assertEquals(JOB_A, Tasks.getJob(state));
-    }
-  }
-
-  @Test
   public void testRestartShards() throws Exception {
     expectKillTask(2);
     expectTaskNotThrottled().times(2);
@@ -810,56 +541,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testPortResource() throws Exception {
-    control.replay();
-    buildScheduler();
-
-    TaskConfig config = productionTask()
-        .setRequestedPorts(ImmutableSet.of("one", "two", "three"));
-
-    scheduler.createJob(makeJob(KEY_A, config, 1));
-
-    String taskId = Tasks.id(getOnlyTask(
-        Query.instanceScoped(KEY_A, 0).active()));
-
-    assignTask(taskId, SLAVE_ID, SLAVE_HOST_1, ImmutableSet.of(80, 81, 82));
-
-    IAssignedTask task = getTask(taskId).getAssignedTask();
-    assertEquals(
-        ImmutableSet.of("one", "two", "three"),
-        task.getTask().getRequestedPorts());
-  }
-
-  @Test
-  public void testPortResourceResetAfterReschedule() throws Exception {
-    expectKillTask(1);
-    expectTaskNotThrottled();
-    expect(cronJobManager.hasJob(KEY_A)).andReturn(false);
-
-    control.replay();
-    buildScheduler();
-
-    TaskConfig config = productionTask().setRequestedPorts(ImmutableSet.of("one"));
-
-    scheduler.createJob(makeJob(KEY_A, config, 1));
-
-    String taskId = Tasks.id(getOnlyTask(
-        Query.instanceScoped(KEY_A, 0).active()));
-
-    assignTask(taskId, SLAVE_ID, SLAVE_HOST_1, ImmutableSet.of(80));
-
-    // The task should be rescheduled.
-    changeStatus(taskId, LOST);
-
-    String newTaskId = Tasks.id(getOnlyTask(
-        Query.instanceScoped(KEY_A, 0).active()));
-    assignTask(newTaskId, SLAVE_ID, SLAVE_HOST_1, ImmutableSet.of(86));
-
-    IAssignedTask task = getTask(newTaskId).getAssignedTask();
-    assertEquals(ImmutableMap.of("one", 86), task.getAssignedPorts());
-  }
-
-  @Test
   public void testAuditMessage() throws Exception {
     control.replay();
     buildScheduler();
@@ -1044,16 +725,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     return makeIdentity(jobKey.getRole());
   }
 
-  private static SanitizedConfiguration makeCronJob(
-      IJobKey jobKey,
-      int numDefaultTasks,
-      String cronSchedule) throws TaskDescriptionException {
-
-    SanitizedConfiguration job = makeJob(jobKey, numDefaultTasks);
-    return new SanitizedConfiguration(
-        IJobConfiguration.build(job.getJobConfig().newBuilder().setCronSchedule(cronSchedule)));
-  }
-
   private static SanitizedConfiguration makeJob(IJobKey jobKey, int numDefaultTasks)
       throws TaskDescriptionException  {
 
@@ -1118,33 +789,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     return Storage.Util.consistentFetchTasks(storage, Query.unscoped().byStatus(status));
   }
 
-  private Set<IScheduledTask> getTasksOwnedBy(Identity owner) {
-    return Storage.Util.consistentFetchTasks(storage, query(owner, null, null));
-  }
-
-  private Query.Builder queryJob(Identity owner, String jobName) {
-    return query(owner, jobName, null);
-  }
-
-  private Query.Builder query(
-      @Nullable Identity owner,
-      @Nullable String jobName,
-      @Nullable Iterable<String> taskIds) {
-
-    TaskQuery query = new TaskQuery();
-    if (owner != null) {
-      query.setOwner(owner);
-    }
-    if (jobName != null) {
-      query.setJobName(jobName);
-    }
-    if (taskIds != null) {
-      query.setTaskIds(Sets.newHashSet(taskIds));
-    }
-
-    return Query.arbitrary(query);
-  }
-
   public void changeStatus(
       Query.Builder query,
       ScheduleStatus status,
@@ -1170,23 +814,4 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   public void changeStatus(String taskId, ScheduleStatus status, Optional<String> message) {
     changeStatus(Query.taskScoped(taskId), status, message);
   }
-
-  private SanitizedConfiguration hasJobKey(final IJobKey key) {
-    reportMatcher(new IArgumentMatcher() {
-      @Override
-      public boolean matches(Object item) {
-        if (!(item instanceof SanitizedConfiguration)) {
-          return false;
-        }
-        SanitizedConfiguration configuration = (SanitizedConfiguration) item;
-        return key.equals(configuration.getJobConfig().getKey());
-      }
-
-      @Override
-      public void appendTo(StringBuffer buffer) {
-        buffer.append(key.toString());
-      }
-    });
-    return null;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/46c9011c/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
index 94eb0a2..1678411 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
@@ -57,6 +57,7 @@ import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
 import static org.apache.aurora.gen.ScheduleStatus.FAILED;
 import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
 import static org.apache.aurora.gen.ScheduleStatus.INIT;
+import static org.apache.aurora.gen.ScheduleStatus.KILLED;
 import static org.apache.aurora.gen.ScheduleStatus.KILLING;
 import static org.apache.aurora.gen.ScheduleStatus.LOST;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
@@ -203,6 +204,24 @@ public class StateManagerImplTest extends EasyMockTest {
   }
 
   @Test
+  public void testKillRunningTask() {
+    ITaskConfig task = makeTask(JIM, MY_JOB);
+    String taskId = "a";
+    expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
+    expectStateTransitions(taskId, INIT, PENDING, ASSIGNED, RUNNING, KILLING, KILLED);
+    driver.killTask(EasyMock.<String>anyObject());
+
+    control.replay();
+
+    insertTask(task, 0);
+    assignTask(taskId, HOST_A);
+    assertEquals(true, changeState(taskId, RUNNING));
+    assertEquals(true, changeState(taskId, KILLING));
+    assertEquals(true, changeState(taskId, KILLED));
+    assertEquals(false, changeState(taskId, KILLED));
+  }
+
+  @Test
   public void testLostKillingTask() {
     ITaskConfig task = makeTask(JIM, MY_JOB);
     String taskId = "a";
@@ -394,6 +413,59 @@ public class StateManagerImplTest extends EasyMockTest {
     stateManager.deleteTasks(ImmutableSet.of(taskId));
   }
 
+  @Test
+  public void testPortResource() throws Exception {
+    Set<String> requestedPorts = ImmutableSet.of("one", "two", "three");
+    ITaskConfig task = ITaskConfig.build(makeTask(JIM, MY_JOB).newBuilder()
+        .setRequestedPorts(requestedPorts));
+
+    String taskId = "a";
+    expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
+    expectStateTransitions(taskId, INIT, PENDING, ASSIGNED);
+
+    control.replay();
+
+    insertTask(task, 0);
+    assignTask(taskId, HOST_A, ImmutableSet.of(80, 81, 82));
+
+    IScheduledTask actual = Iterables.getOnlyElement(
+        Storage.Util.consistentFetchTasks(storage, Query.taskScoped(taskId)));
+
+    assertEquals(
+        requestedPorts,
+        actual.getAssignedTask().getTask().getRequestedPorts());
+  }
+
+  @Test
+  public void testPortResourceResetAfterReschedule() throws Exception {
+    Set<String> requestedPorts = ImmutableSet.of("one");
+    ITaskConfig task = ITaskConfig.build(makeTask(JIM, MY_JOB).newBuilder()
+        .setRequestedPorts(requestedPorts));
+
+    String taskId = "a";
+    expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
+    expectStateTransitions(taskId, INIT, PENDING, ASSIGNED, RUNNING, LOST);
+
+    String newTaskId = "b";
+    expect(taskIdGenerator.generate(task, 0)).andReturn(newTaskId);
+    expectStateTransitions(newTaskId, INIT, PENDING, ASSIGNED);
+    noFlappingPenalty();
+
+    control.replay();
+
+    insertTask(task, 0);
+    assignTask(taskId, HOST_A, ImmutableSet.of(80));
+    changeState(taskId, RUNNING);
+    changeState(taskId, LOST);
+
+    assignTask(newTaskId, HOST_A, ImmutableSet.of(86));
+
+    IScheduledTask actual = Iterables.getOnlyElement(
+        Storage.Util.consistentFetchTasks(storage, Query.taskScoped(newTaskId)));
+
+    assertEquals(ImmutableMap.of("one", 86), actual.getAssignedTask().getAssignedPorts());
+  }
+
   private void expectStateTransitions(
       String taskId,
       ScheduleStatus initial,
@@ -437,7 +509,10 @@ public class StateManagerImplTest extends EasyMockTest {
   }
 
   private void assignTask(String taskId, String host) {
-    stateManager.assignTask(taskId, host, SlaveID.newBuilder().setValue(host).build(),
-        ImmutableSet.<Integer>of());
+    assignTask(taskId, host, ImmutableSet.<Integer>of());
+  }
+
+  private void assignTask(String taskId, String host, Set<Integer> ports) {
+    stateManager.assignTask(taskId, host, SlaveID.newBuilder().setValue(host).build(), ports);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/46c9011c/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java b/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
new file mode 100644
index 0000000..ad2548c
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.twitter.common.util.Clock;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Constraint;
+import org.apache.aurora.gen.ExecutorConfig;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.JobConfiguration;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.configuration.ConfigurationManager;
+import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.storage.mem.MemStorage;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.KILLED;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.junit.Assert.assertEquals;
+
+public class StorageBackfillTest {
+  private static final String ROLE = "Test_Role_A";
+  private static final String USER = "Test_User_A";
+  private static final Identity OWNER = new Identity(ROLE, USER);
+  private static final String ENV = "Test_Env";
+  private static final String JOB_NAME = "Test_Job";
+  private static final IJobKey JOB_KEY = JobKeys.from(ROLE, ENV, JOB_NAME);
+  private static final int ONE_GB = 1024;
+  private static final String TASK_ID = "task_id";
+  private static final ExecutorConfig EXECUTOR_CONFIG =
+      new ExecutorConfig("AuroraExecutor", "executorConfig");
+
+  private Storage storage;
+  private Clock clock;
+
+  @Before
+  public void setUp() {
+    storage = MemStorage.newEmptyStorage();
+    clock = new FakeClock();
+  }
+
+  @Test
+  public void testLoadTasksFromStorage() throws Exception {
+    final TaskConfig storedTask = defaultTask();
+
+    storage.write(new Storage.MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(Storage.MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(
+            IScheduledTask.build(new ScheduledTask()
+                .setStatus(PENDING)
+                .setTaskEvents(ImmutableList.of(new TaskEvent(100, PENDING)))
+                .setAssignedTask(new AssignedTask()
+                    .setTaskId(TASK_ID)
+                    .setInstanceId(0)
+                    .setTask(storedTask)))));
+      }
+    });
+
+    backfill();
+
+    // Since task fields are backfilled with defaults, additional flags should be filled.
+    ITaskConfig expected = ITaskConfig.build(new TaskConfig(storedTask)
+        .setProduction(false)
+        .setMaxTaskFailures(1)
+        .setExecutorConfig(EXECUTOR_CONFIG)
+        .setConstraints(ImmutableSet.of(ConfigurationManager.hostLimitConstraint(1))));
+
+    assertEquals(expected, getTask(TASK_ID).getAssignedTask().getTask());
+  }
+
+  @Test
+  public void testShardUniquenessCorrection() throws Exception {
+
+    final AtomicInteger taskId = new AtomicInteger();
+
+    SanitizedConfiguration job = makeJob(JOB_KEY, defaultTask(), 10);
+    final Set<IScheduledTask> badTasks = ImmutableSet.copyOf(Iterables.transform(
+        job.getTaskConfigs().values(),
+        new Function<ITaskConfig, IScheduledTask>() {
+          @Override
+          public IScheduledTask apply(ITaskConfig task) {
+            return IScheduledTask.build(new ScheduledTask()
+                .setStatus(RUNNING)
+                .setAssignedTask(new AssignedTask()
+                    .setInstanceId(0)
+                    .setTaskId("task-" + taskId.incrementAndGet())
+                    .setTask(task.newBuilder())));
+          }
+        }));
+
+    storage.write(new Storage.MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(Storage.MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().saveTasks(badTasks);
+      }
+    });
+
+    backfill();
+
+    assertEquals(1, getTasksByStatus(RUNNING).size());
+    assertEquals(9, getTasksByStatus(KILLED).size());
+  }
+
+  private void backfill() {
+    storage.write(new Storage.MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(Storage.MutableStoreProvider storeProvider) {
+        StorageBackfill.backfill(storeProvider, clock);
+      }
+    });
+  }
+
+  private static SanitizedConfiguration makeJob(
+      IJobKey jobKey,
+      TaskConfig task,
+      int numTasks) throws Exception {
+
+    JobConfiguration job = new JobConfiguration()
+        .setOwner(OWNER)
+        .setKey(jobKey.newBuilder())
+        .setInstanceCount(numTasks)
+        .setTaskConfig(new TaskConfig(task)
+            .setOwner(OWNER)
+            .setEnvironment(jobKey.getEnvironment())
+            .setJobName(jobKey.getName()));
+    return SanitizedConfiguration.fromUnsanitized(IJobConfiguration.build(job));
+  }
+
+  private static TaskConfig defaultTask() {
+    return new TaskConfig()
+        .setOwner(OWNER)
+        .setJobName(JOB_NAME)
+        .setEnvironment(ENV)
+        .setNumCpus(1.0)
+        .setRamMb(ONE_GB)
+        .setDiskMb(500)
+        .setExecutorConfig(EXECUTOR_CONFIG)
+        .setRequestedPorts(ImmutableSet.<String>of())
+        .setConstraints(ImmutableSet.<Constraint>of())
+        .setTaskLinks(ImmutableMap.<String, String>of());
+  }
+
+  private IScheduledTask getTask(String taskId) {
+    return Iterables.getOnlyElement(Storage.Util.consistentFetchTasks(
+        storage,
+        Query.taskScoped(taskId)));
+  }
+
+  private Set<IScheduledTask> getTasksByStatus(ScheduleStatus status) {
+    return Storage.Util.consistentFetchTasks(storage, Query.unscoped().byStatus(status));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/46c9011c/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index b28761d..d3ac7c7 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -103,6 +103,7 @@ import org.apache.aurora.scheduler.state.LockManager;
 import org.apache.aurora.scheduler.state.LockManager.LockException;
 import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.SchedulerCore;
+import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
 import org.apache.aurora.scheduler.storage.backup.Recovery;
@@ -138,6 +139,7 @@ import static org.apache.aurora.gen.ResponseCode.WARNING;
 import static org.apache.aurora.gen.apiConstants.DEFAULT_ENVIRONMENT;
 import static org.apache.aurora.gen.apiConstants.THRIFT_API_VERSION;
 import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE;
+import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.killedByMessage;
 import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.transitionMessage;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.eq;
@@ -159,6 +161,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   private static final ILock LOCK = ILock.build(new Lock().setKey(LOCK_KEY.newBuilder()));
   private static final JobConfiguration CRON_JOB = makeJob().setCronSchedule("* * * * *");
   private static final Lock DEFAULT_LOCK = null;
+  private static final String TASK_ID = "task_id";
 
   private static final IResourceAggregate QUOTA =
       IResourceAggregate.build(new ResourceAggregate(10.0, 1024, 2048));
@@ -186,6 +189,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   private CronPredictor cronPredictor;
   private QuotaManager quotaManager;
   private NearestFit nearestFit;
+  private StateManager stateManager;
 
   @Before
   public void setUp() throws Exception {
@@ -203,6 +207,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     cronPredictor = createMock(CronPredictor.class);
     quotaManager = createMock(QuotaManager.class);
     nearestFit = createMock(NearestFit.class);
+    stateManager = createMock(StateManager.class);
 
     // Use guice and install AuthModule to apply AOP-style auth layer.
     Module testModule = new AbstractModule() {
@@ -222,6 +227,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
         bind(IServerInfo.class).toInstance(IServerInfo.build(SERVER_INFO));
         bind(CronPredictor.class).toInstance(cronPredictor);
         bind(NearestFit.class).toInstance(nearestFit);
+        bind(StateManager.class).toInstance(stateManager);
       }
     };
     Injector injector = Guice.createInjector(testModule, new AopModule());
@@ -377,23 +383,36 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     assertResponse(AUTH_FAILED, thrift.createJob(makeJob(), DEFAULT_LOCK, SESSION));
   }
 
-  private static IScheduledTask buildScheduledTask(String jobName) {
+  private IScheduledTask buildScheduledTask() {
+    return buildScheduledTask(JOB_NAME, TASK_ID);
+  }
+
+  private static IScheduledTask buildScheduledTask(String jobName, String taskId) {
     return IScheduledTask.build(new ScheduledTask()
         .setAssignedTask(new AssignedTask()
+            .setTaskId(taskId)
             .setTask(new TaskConfig()
                 .setOwner(ROLE_IDENTITY)
                 .setEnvironment(DEFAULT_ENVIRONMENT)
                 .setJobName(jobName))));
   }
 
+  private void expectTransitionsToKilling() {
+    expect(stateManager.changeState(
+        TASK_ID,
+        Optional.<ScheduleStatus>absent(),
+        ScheduleStatus.KILLING,
+        killedByMessage(USER))).andReturn(true);
+  }
+
   @Test
-  public void testKillTasksImmediate() throws Exception {
+  public void testUserKillTasks() throws Exception {
     Query.Builder query = Query.unscoped().byJob(JOB_KEY).active();
     expectAuth(ROOT, false);
     expectAuth(ROLE, true);
-    storageUtil.expectTaskFetch(query, buildScheduledTask(JOB_NAME));
-    scheduler.killTasks(query, USER);
+    storageUtil.expectTaskFetch(query, buildScheduledTask());
     lockManager.validateIfLocked(LOCK_KEY, Optional.of(LOCK));
+    expectTransitionsToKilling();
 
     control.replay();
 
@@ -401,14 +420,53 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   }
 
   @Test
+  public void testAdminKillTasks() throws Exception {
+    Query.Builder query = Query.unscoped().byJob(JOB_KEY).active();
+    expectAuth(ROOT, true);
+    storageUtil.expectTaskFetch(query, buildScheduledTask());
+    lockManager.validateIfLocked(LOCK_KEY, Optional.<ILock>absent());
+    expectTransitionsToKilling();
+
+    control.replay();
+
+    assertOkResponse(thrift.killTasks(query.get(), DEFAULT_LOCK, SESSION));
+  }
+
+  @Test
+  public void testKillQueryActive() throws Exception {
+    Query.Builder query = Query.unscoped().byJob(JOB_KEY);
+    expectAuth(ROOT, true);
+    storageUtil.expectTaskFetch(query.active(), buildScheduledTask());
+    expect(cronJobManager.deleteJob(JOB_KEY)).andReturn(false);
+    lockManager.validateIfLocked(LOCK_KEY, Optional.<ILock>absent());
+    expectTransitionsToKilling();
+
+    control.replay();
+
+    assertOkResponse(thrift.killTasks(query.get(), DEFAULT_LOCK, SESSION));
+  }
+
+  @Test
+  public void testKillCronJob() throws Exception {
+    Query.Builder query = Query.jobScoped(JOB_KEY);
+    expectAuth(ROOT, true);
+    storageUtil.expectTaskFetch(query.active());
+    expect(cronJobManager.deleteJob(JOB_KEY)).andReturn(true);
+
+    control.replay();
+
+    assertOkResponse(thrift.killTasks(query.get(), DEFAULT_LOCK, SESSION));
+  }
+
+  @Test
   public void testKillTasksLockCheckFailed() throws Exception {
     Query.Builder query = Query.unscoped().byJob(JOB_KEY).active();
-    IScheduledTask task2 = buildScheduledTask("job_bar");
+    IScheduledTask task2 = buildScheduledTask("job_bar", TASK_ID);
     ILockKey key2 = ILockKey.build(LockKey.job(
         JobKeys.from(ROLE, DEFAULT_ENVIRONMENT, "job_bar").newBuilder()));
     expectAuth(ROOT, false);
     expectAuth(ROLE, true);
-    storageUtil.expectTaskFetch(query, buildScheduledTask(JOB_NAME), task2);
+    storageUtil.expectTaskFetch(query, buildScheduledTask(), task2);
     lockManager.validateIfLocked(LOCK_KEY, Optional.of(LOCK));
     lockManager.validateIfLocked(key2, Optional.of(LOCK));
     expectLastCall().andThrow(new LockException("Failed lock check."));
@@ -423,7 +481,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     Query.Builder query = Query.unscoped().byJob(JOB_KEY).active();
     expectAuth(ROOT, false);
     expectAuth(ROLE, false);
-    storageUtil.expectTaskFetch(query, buildScheduledTask(JOB_NAME));
+    storageUtil.expectTaskFetch(query, buildScheduledTask(JOB_NAME, TASK_ID));
 
     control.replay();
 
@@ -431,20 +489,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   }
 
   @Test
-  public void testAdminKillTasks() throws Exception {
-    Query.Builder query = Query.unscoped().byJob(JOB_KEY).active();
-
-    expectAuth(ROOT, true);
-    scheduler.killTasks(query, USER);
-    storageUtil.expectTaskFetch(query);
-
-    control.replay();
-
-    assertOkResponse(thrift.killTasks(query.get(), DEFAULT_LOCK, SESSION));
-  }
-
-  @Test
-  public void testKillTasksInvalidJobname() throws Exception {
+  public void testKillTasksInvalidJobName() throws Exception {
     TaskQuery query = new TaskQuery()
         .setOwner(ROLE_IDENTITY)
         .setJobName("");
@@ -457,14 +502,29 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   @Test
   public void testKillNonExistentTasks() throws Exception {
     Query.Builder query = Query.unscoped().byJob(JOB_KEY).active();
-
     expectAuth(ROOT, true);
-
-    scheduler.killTasks(query, USER);
     storageUtil.expectTaskFetch(query);
 
     control.replay();
 
+    Response response = thrift.killTasks(query.get(), DEFAULT_LOCK, SESSION);
+    assertOkResponse(response);
+    assertMessageMatches(response, SchedulerThriftInterface.NO_TASKS_TO_KILL_MESSAGE);
+  }
+
+  @Test
+  public void testKillAuthenticatesQueryRole() throws Exception {
+    expectAuth(ROOT, false);
+    expectAuth(ImmutableSet.of("foo", ROLE), true);
+
+    Query.Builder query = Query.roleScoped("foo").active();
+
+    storageUtil.expectTaskFetch(query, buildScheduledTask());
+    lockManager.validateIfLocked(LOCK_KEY, Optional.<ILock>absent());
+    expectTransitionsToKilling();
+
+    control.replay();
+
     assertOkResponse(thrift.killTasks(query.get(), DEFAULT_LOCK, SESSION));
   }
 
@@ -1502,22 +1562,6 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   }
 
   @Test
-  public void testKillAuthenticatesQueryRole() throws Exception {
-    expectAuth(ROOT, false);
-    expectAuth(ImmutableSet.of("foo", ROLE), true);
-
-    Query.Builder query = Query.roleScoped("foo");
-
-    storageUtil.expectTaskFetch(query, buildScheduledTask(JOB_NAME));
-    scheduler.killTasks(query, USER);
-    lockManager.validateIfLocked(LOCK_KEY, Optional.<ILock>absent());
-
-    control.replay();
-
-    assertOkResponse(thrift.killTasks(query.get(), DEFAULT_LOCK, SESSION));
-  }
-
-  @Test
   public void testGetVersion() throws Exception {
     control.replay();
 


Mime
View raw message