mesos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d..@apache.org
Subject git commit: Split authorization from task validation.
Date Tue, 21 Oct 2014 20:02:04 GMT
Repository: mesos
Updated Branches:
  refs/heads/master 5ccf43e5a -> 526e1ee61


Split authorization from task validation.

Review: https://reviews.apache.org/r/26817


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/526e1ee6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/526e1ee6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/526e1ee6

Branch: refs/heads/master
Commit: 526e1ee615806e7f64b9bc137f6784b809aa4c55
Parents: 5ccf43e
Author: Dominic Hamon <dhamon@twitter.com>
Authored: Thu Oct 16 11:47:43 2014 -0700
Committer: Dominic Hamon <dhamon@twitter.com>
Committed: Tue Oct 21 13:01:37 2014 -0700

----------------------------------------------------------------------
 src/master/master.cpp | 86 +++++++++++++++++++++++++++++++++-------------
 src/master/master.hpp | 16 ++++++---
 2 files changed, 74 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/526e1ee6/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 0a5c9a3..be910d9 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2316,10 +2316,10 @@ void Master::launchTasks(
             << " on slave " << *slave
             << " for framework " << *framework;
 
-  // Validate each task and launch if valid.
-  list<Future<Option<Error> > > futures;
+  // Validate each task.
+  vector<Option<Error>> validations;
   foreach (const TaskInfo& task, tasks) {
-    futures.push_back(validateTask(task, framework, slave, used));
+    validations.push_back(validateTask(task, framework, slave, used));
 
     // Add to pending tasks.
     // NOTE: We need to do this here after validation because of the
@@ -2329,22 +2329,28 @@ void Master::launchTasks(
     stats.tasks[TASK_STAGING]++;
   }
 
-  // Wait for all the tasks to be validated.
-  // NOTE: We wait for all tasks because currently the allocator
-  // is expected to get 'resourcesRecovered()' once per 'launchTasks()'.
+  // Authorize each task.
+  list<Future<bool>> futures;
+  foreach (const TaskInfo& task, tasks) {
+    // TODO(dhamon): Only authorize if there's no validation error.
+    futures.push_back(authorizeTask(task, framework));
+  }
+
+  // Wait for all the tasks to be authorized.
   await(futures)
     .onAny(defer(self(),
                  &Master::_launchTasks,
-                 framework->id,
+                 frameworkId,
                  slaveId.get(),
                  tasks,
                  used,
                  filters,
+                 validations,
                  lambda::_1));
 }
 
 
-Future<Option<Error> > Master::validateTask(
+Option<Error> Master::validateTask(
     const TaskInfo& task,
     Framework* framework,
     Slave* slave,
@@ -2381,9 +2387,17 @@ Future<Option<Error> > Master::validateTask(
     return Error(error.get().message);
   }
 
+  return None();
+}
+
+
+Future<bool> Master::authorizeTask(
+    const TaskInfo& task,
+    Framework* framework)
+{
   if (authorizer.isNone()) {
     // Authorization is disabled.
-    return None();
+    return true;
   }
 
   // Authorize the task.
@@ -2407,10 +2421,7 @@ Future<Option<Error> > Master::validateTask(
   }
   request.mutable_users()->add_values(user);
 
-  return authorizer.get()->authorize(request).then(
-      lambda::bind(&_authorize,
-                   "Not authorized to launch as user '" + user + "'",
-                   lambda::_1));
+  return authorizer.get()->authorize(request);
 }
 
 
@@ -2482,10 +2493,12 @@ void Master::_launchTasks(
     const vector<TaskInfo>& tasks,
     const Resources& totalResources,
     const Filters& filters,
-    const Future<list<Future<Option<Error> > > >& validationErrors)
+    const vector<Option<Error>>& validations,
+    const Future<list<Future<bool>>>& authorizations)
 {
-  CHECK_READY(validationErrors);
-  CHECK_EQ(validationErrors.get().size(), tasks.size());
+  CHECK_EQ(validations.size(), tasks.size());
+  CHECK_READY(authorizations);
+  CHECK_EQ(authorizations.get().size(), tasks.size());
 
   Framework* framework = getFramework(frameworkId);
   if (framework == NULL) {
@@ -2524,8 +2537,11 @@ void Master::_launchTasks(
   Resources usedResources; // Accumulated resources used.
 
   size_t index = 0;
-  foreach (const Future<Option<Error> >& future, validationErrors.get())
{
-    const TaskInfo& task = tasks[index++];
+  foreach (const Future<bool>& authorization, authorizations.get()) {
+    const TaskInfo& task = tasks[index];
+    const Option<Error>& validation = validations[index];
+
+    ++index;
 
     // NOTE: The task will not be in 'pendingTasks' if 'killTask()'
     // for the task was called before we are here.
@@ -2535,18 +2551,40 @@ void Master::_launchTasks(
 
     framework->pendingTasks.erase(task.task_id()); // Remove from pending tasks.
 
-    CHECK(!future.isDiscarded());
-    if (future.isFailed() || future.get().isSome()) {
-      const string error = future.isFailed()
-          ? "Authorization failure: " + future.failure()
-          : future.get().get().message;
+    if (validation.isSome()) {
+      const StatusUpdate& update = protobuf::createStatusUpdate(
+          framework->id,
+          task.slave_id(),
+          task.task_id(),
+          TASK_LOST,
+          validation.get().message);
+
+      metrics.tasks_lost++;
+      stats.tasks[TASK_LOST]++;
+
+      forward(update, UPID(), framework);
+
+      continue;
+    }
+
+    CHECK(!authorization.isDiscarded());
+
+    if (authorization.isFailed() || !authorization.get()) {
+      string user = framework->info.user(); // Default user.
+      if (task.has_command() && task.command().has_user()) {
+        user = task.command().user();
+      } else if (task.has_executor() && task.executor().command().has_user()) {
+        user = task.executor().command().user();
+      }
 
       const StatusUpdate& update = protobuf::createStatusUpdate(
           framework->id,
           task.slave_id(),
           task.task_id(),
           TASK_LOST,
-          error);
+          authorization.isFailed() ?
+              "Authorization failure: " + authorization.failure() :
+              "Not authorized to launch as user '" + user + "'");
 
       metrics.tasks_lost++;
       stats.tasks[TASK_LOST]++;

http://git-wip-us.apache.org/repos/asf/mesos/blob/526e1ee6/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 14f1d0f..18898e9 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -346,16 +346,23 @@ protected:
       const std::vector<StatusUpdate>& updates,
       const process::Future<bool>& removed);
 
-  // Validates the task including authorization.
+  // Validates the task.
   // Returns None if the task is valid.
   // Returns Error if the task is invalid.
-  // Returns Failure if authorization returns 'Failure'.
-  process::Future<Option<Error> > validateTask(
+  Option<Error> validateTask(
       const TaskInfo& task,
       Framework* framework,
       Slave* slave,
       const Resources& totalResources);
 
+  // Authorizes the task.
+  // Returns true if task is authorized.
+  // Returns false if task is not authorized.
+  // Returns failure for transient authorization failures.
+  process::Future<bool> authorizeTask(
+      const TaskInfo& task,
+      Framework* framework);
+
   // Launch a task from a task description.
   void launchTask(const TaskInfo& task, Framework* framework, Slave* slave);
 
@@ -366,7 +373,8 @@ protected:
       const std::vector<TaskInfo>& tasks,
       const Resources& totalResources,
       const Filters& filters,
-      const process::Future<std::list<process::Future<Option<Error> > >
>& f);
+      const std::vector<Option<Error>>& validations,
+      const process::Future<std::list<process::Future<bool>>>& authorizations);
 
   // Transitions the task, and recovers resources if the task becomes
   // terminal.


Mime
View raw message