aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject incubator-aurora git commit: Avoid performing RPC authentication while holding the write lock.
Date Mon, 26 Jan 2015 23:45:44 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master ff8cdcf09 -> 806eb545e


Avoid performing RPC authentication while holding the write lock.

Bugs closed: AURORA-1045

Reviewed at https://reviews.apache.org/r/30178/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/806eb545
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/806eb545
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/806eb545

Branch: refs/heads/master
Commit: 806eb545e9c366cf41fc130244f2a6fa95c6cf3d
Parents: ff8cdcf
Author: Bill Farner <wfarner@apache.org>
Authored: Mon Jan 26 15:44:28 2015 -0800
Committer: Bill Farner <wfarner@apache.org>
Committed: Mon Jan 26 15:45:02 2015 -0800

----------------------------------------------------------------------
 .../thrift/SchedulerThriftInterface.java        | 177 +++++++++++--------
 .../thrift/SchedulerThriftInterfaceTest.java    |  50 +++++-
 2 files changed, 151 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/806eb545/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index ac92959..8c19f3b 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -31,6 +31,7 @@ import com.google.common.base.Function;
 import com.google.common.base.Functions;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.base.Strings;
@@ -707,23 +708,44 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     }
   }
 
-  private SessionContext validateSessionKeyForTasks(
-      SessionKey session,
+  private static Query.Builder implicitKillQuery(TaskQuery mutableQuery) {
+    Query.Builder query = Query.arbitrary(mutableQuery);
+    // Unless statuses were specifically supplied, only attempt to kill active tasks.
+    return query.get().isSetStatuses() ? query : query.byStatus(ACTIVE_STATES);
+  }
+
+  private SessionContext authenticateNonAdminKillingTasks(
       Query.Builder taskQuery,
-      Iterable<IScheduledTask> tasks) throws AuthFailedException {
+      SessionKey session) throws AuthFailedException {
 
     // Authenticate the session against any affected roles, always including the role for
a
-    // role-scoped query.  This papers over the implementation detail that dormant cron jobs
are
-    // authenticated this way.
-    ImmutableSet.Builder<String> targetRoles = ImmutableSet.<String>builder()
-        .addAll(FluentIterable.from(tasks).transform(GET_ROLE));
+    // role-scoped query.
+    ImmutableSet.Builder<String> targetRoles = ImmutableSet.builder();
     Set<IJobKey> keys = JobKeys.from(taskQuery).or(ImmutableSet.<IJobKey>of());
     targetRoles.addAll(FluentIterable.from(keys).transform(JobKeys.TO_ROLE));
 
     if (taskQuery.get().isSetRole()) {
       targetRoles.add(taskQuery.get().getRole());
     }
-    return sessionValidator.checkAuthenticated(session, targetRoles.build());
+
+    if (taskQuery.get().isSetTaskIds()) {
+      // Note: this operation is weakly-consistent with respect to the transaction that performs
+      // the kill.  This means the task could exit between authentication and kill.  Since
the user
+      // asked to kill a task ID (rather than an instance ID, which would outlast a task),
this is
+      // considered acceptable.
+      targetRoles.addAll(
+          FluentIterable.from(Storage.Util.fetchTasks(storage, taskQuery)).transform(GET_ROLE));
+    }
+
+    Set<String> explicitRoles = targetRoles.build();
+    // Disallow non-admin users from killing tasks on behalf of arbitrary roles.  Since query
fields
+    // are AND-ed together, the presence of a role scope on the query is sufficient for a
non-admin
+    // user to perform the operation.
+    if (explicitRoles.isEmpty()) {
+      throw new AuthFailedException("Only an administrator may kill an arbitrary user's tasks.");
+    } else {
+      return sessionValidator.checkAuthenticated(session, explicitRoles);
+    }
   }
 
   private Optional<SessionContext> isAdmin(SessionKey session) {
@@ -736,11 +758,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
   }
 
   @Override
-  public Response killTasks(
-      final TaskQuery mutableQuery,
-      final Lock mutableLock,
-      final SessionKey session) {
-
+  public Response killTasks(TaskQuery mutableQuery, final Lock mutableLock, SessionKey session)
{
     requireNonNull(mutableQuery);
     requireNonNull(session);
 
@@ -748,29 +766,28 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       return invalidResponse(String.format("Invalid job name: '%s'", mutableQuery.getJobName()));
     }
 
+    final Query.Builder query = implicitKillQuery(mutableQuery);
+    Preconditions.checkState(
+        !query.get().isSetOwner(),
+        "The owner field in a query should have been unset by Query.Builder.");
+
+    Optional<SessionContext> maybeAdminContext = isAdmin(session);
+    final SessionContext context;
+    if (maybeAdminContext.isPresent()) {
+      LOG.info("Granting kill query to admin user: " + query);
+      context = maybeAdminContext.get();
+    } else {
+      try {
+        context = authenticateNonAdminKillingTasks(query, session);
+      } catch (AuthFailedException e) {
+        return errorResponse(AUTH_FAILED, e);
+      }
+    }
+
     return storage.write(new MutateWork.Quiet<Response>() {
       @Override
       public Response apply(MutableStoreProvider storeProvider) {
-        Query.Builder query = Query.arbitrary(mutableQuery);
-
-        // Unless statuses were specifically supplied, only attempt to kill active tasks.
-        query = query.get().isSetStatuses() ? query : query.byStatus(ACTIVE_STATES);
-
-        final Set<IScheduledTask> tasks = storeProvider.getTaskStore().fetchTasks(query);
-
-        Optional<SessionContext> maybeAdminContext = isAdmin(session);
-        final SessionContext context;
-        if (maybeAdminContext.isPresent()) {
-          LOG.info("Granting kill query to admin user: " + query);
-          context = maybeAdminContext.get();
-        } else {
-          try {
-            context = validateSessionKeyForTasks(session, query, tasks);
-          } catch (AuthFailedException e) {
-            return errorResponse(AUTH_FAILED, e);
-          }
-        }
-
+        Iterable<IScheduledTask> tasks = storeProvider.getTaskStore().fetchTasks(query);
         try {
           validateLockForTasks(
               Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER),
@@ -967,7 +984,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
   @Override
   public Response queryRecovery(TaskQuery query, SessionKey session) {
     return okResponse(Result.queryRecoveryResult(new QueryRecoveryResult()
-            .setTasks(IScheduledTask.toBuildersSet(recovery.query(Query.arbitrary(query))))));
+        .setTasks(IScheduledTask.toBuildersSet(recovery.query(Query.arbitrary(query))))));
   }
 
   @Override
@@ -1437,19 +1454,24 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     });
   }
 
-  @Override
-  public Response pauseJobUpdate(final JobKey mutableJobKey, final SessionKey session) {
+  private Response changeJobUpdateState(
+      JobKey mutableJobKey,
+      SessionKey session,
+      final JobUpdateStateChange change) {
+
+    final IJobKey jobKey = JobKeys.assertValid(IJobKey.build(requireNonNull(mutableJobKey)));
+    final SessionContext context;
+    try {
+      context = sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
+    } catch (AuthFailedException e) {
+      return errorResponse(AUTH_FAILED, e);
+    }
     return storage.write(new MutateWork.Quiet<Response>() {
       @Override
       public Response apply(MutableStoreProvider storeProvider) {
         try {
-          IJobKey jobKey = JobKeys.assertValid(IJobKey.build(requireNonNull(mutableJobKey)));
-          SessionContext context = sessionValidator
-              .checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
-          jobUpdateController.pause(jobKey, context.getIdentity());
+          change.modifyUpdate(jobUpdateController, jobKey, context.getIdentity());
           return okEmptyResponse();
-        } catch (AuthFailedException e) {
-          return errorResponse(AUTH_FAILED, e);
         } catch (UpdateStateException e) {
           return errorResponse(INVALID_REQUEST, e);
         }
@@ -1457,44 +1479,51 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     });
   }
 
+  private interface JobUpdateStateChange {
+    void modifyUpdate(JobUpdateController controller, IJobKey job, String invokingUser)
+        throws UpdateStateException;
+  }
+
+  private static final JobUpdateStateChange PAUSE = new JobUpdateStateChange() {
+    @Override
+    public void modifyUpdate(JobUpdateController controller, IJobKey job, String invokingUser)
+        throws UpdateStateException {
+
+      controller.pause(job, invokingUser);
+    }
+  };
+
+  private static final JobUpdateStateChange RESUME = new JobUpdateStateChange() {
+    @Override
+    public void modifyUpdate(JobUpdateController controller, IJobKey job, String invokingUser)
+        throws UpdateStateException {
+
+      controller.resume(job, invokingUser);
+    }
+  };
+
+  private static final JobUpdateStateChange ABORT = new JobUpdateStateChange() {
+    @Override
+    public void modifyUpdate(JobUpdateController controller, IJobKey job, String invokingUser)
+        throws UpdateStateException {
+
+      controller.abort(job, invokingUser);
+    }
+  };
+
+  @Override
+  public Response pauseJobUpdate(JobKey mutableJobKey, SessionKey session) {
+    return changeJobUpdateState(mutableJobKey, session, PAUSE);
+  }
+
   @Override
   public Response resumeJobUpdate(final JobKey mutableJobKey, final SessionKey session) {
-    return storage.write(new MutateWork.Quiet<Response>() {
-      @Override
-      public Response apply(MutableStoreProvider storeProvider) {
-        try {
-          IJobKey jobKey = JobKeys.assertValid(IJobKey.build(requireNonNull(mutableJobKey)));
-          SessionContext context = sessionValidator
-              .checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
-          jobUpdateController.resume(jobKey, context.getIdentity());
-          return okEmptyResponse();
-        } catch (AuthFailedException e) {
-          return errorResponse(AUTH_FAILED, e);
-        } catch (UpdateStateException e) {
-          return errorResponse(INVALID_REQUEST, e);
-        }
-      }
-    });
+    return changeJobUpdateState(mutableJobKey, session, RESUME);
   }
 
   @Override
   public Response abortJobUpdate(final JobKey mutableJobKey, final SessionKey session) {
-    return storage.write(new MutateWork.Quiet<Response>() {
-      @Override
-      public Response apply(MutableStoreProvider storeProvider) {
-        try {
-          IJobKey jobKey = JobKeys.assertValid(IJobKey.build(requireNonNull(mutableJobKey)));
-          SessionContext context = sessionValidator
-              .checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
-          jobUpdateController.abort(jobKey, context.getIdentity());
-          return okEmptyResponse();
-        } catch (AuthFailedException e) {
-          return errorResponse(AUTH_FAILED, e);
-        } catch (UpdateStateException e) {
-          return errorResponse(INVALID_REQUEST, e);
-        }
-      }
-    });
+    return changeJobUpdateState(mutableJobKey, session, ABORT);
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/806eb545/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index 77a60f0..03d1fba 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -802,11 +802,57 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   }
 
   @Test
+  public void testKillByTaskId() throws Exception {
+    // A non-admin user may kill their own tasks when specified by task IDs.
+    Query.Builder query = Query.taskScoped("taskid");
+    expectAuth(ROOT, false);
+    expectAuth(ImmutableSet.of(ROLE), true);
+    // This query happens twice - once for authentication (without consistency) and once
again
+    // to perform the state change (within a write transaction).
+    storageUtil.expectTaskFetch(query.active(), buildScheduledTask()).times(2);
+    lockManager.validateIfLocked(LOCK_KEY, Optional.<ILock>absent());
+    expectTransitionsToKilling();
+
+    control.replay();
+
+    assertOkResponse(thrift.killTasks(query.get(), DEFAULT_LOCK, SESSION));
+  }
+
+  @Test
+  public void testKillByStatus() throws Exception {
+    // A non-admin user may not kill arbitrary tasks.
+    Query.Builder query = Query.statusScoped(ScheduleStatus.RUNNING);
+    expectAuth(ROOT, false);
+
+    control.replay();
+
+    assertResponse(AUTH_FAILED, thrift.killTasks(query.get(), DEFAULT_LOCK, SESSION));
+  }
+
+  @Test
+  public void testKillWithRoleSpecs() throws Exception {
+    // The query performed here is somewhat nonsensical, since we would not have any tasks
owned by
+    // multiple roles.  However, that behavior is defined in the storage system.
+    Query.Builder query = Query.arbitrary(new TaskQuery()
+        .setRole("a")
+        .setJobKeys(ImmutableSet.of(JobKeys.from("b", "devel", "job").newBuilder())));
+
+    expectAuth(ROOT, false);
+    expectAuth(ImmutableSet.of("a", "b"), true);
+    storageUtil.expectTaskFetch(query.active(), buildScheduledTask());
+    lockManager.validateIfLocked(LOCK_KEY, Optional.<ILock>absent());
+    expectTransitionsToKilling();
+
+    control.replay();
+
+    assertOkResponse(thrift.killTasks(query.get(), DEFAULT_LOCK, SESSION));
+  }
+
+  @Test
   public void testKillTasksAuthFailure() throws Exception {
     Query.Builder query = Query.unscoped().byJob(JOB_KEY).active();
     expectAuth(ROOT, false);
     expectAuth(ROLE, false);
-    storageUtil.expectTaskFetch(query, buildScheduledTask(JOB_NAME, TASK_ID));
 
     control.replay();
 
@@ -840,7 +886,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   @Test
   public void testKillAuthenticatesQueryRole() throws Exception {
     expectAuth(ROOT, false);
-    expectAuth(ImmutableSet.of("foo", ROLE), true);
+    expectAuth(ImmutableSet.of("foo"), true);
 
     Query.Builder query = Query.roleScoped("foo").active();
 


Mime
View raw message