aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wick...@apache.org
Subject git commit: Make thermos more cognizant of user deletions
Date Mon, 06 Oct 2014 17:49:57 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 9834b316a -> aad9a5a18


Make thermos more cognizant of user deletions

This is changes 1 of 2 for AURORA-175.  The second change adds an exit
status contract between Aurora and Thermos.

This change allows a process in the Thermos state machine to go directly
from WAITING -> FAILED if the user associated with that process has been
deleted.  It also persists a 'uid' field to the Thermos RunnerHeader so that
we can check against process UIDs which in theory have higher fidelity than
usernames.

Bugs closed: AURORA-175

Reviewed at https://reviews.apache.org/r/25972/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/aad9a5a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/aad9a5a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/aad9a5a1

Branch: refs/heads/master
Commit: aad9a5a182b9fbf82ad6048bae0373014f4f10b4
Parents: 9834b31
Author: Brian Wickman <wickman@apache.org>
Authored: Mon Oct 6 10:49:51 2014 -0700
Committer: Brian Wickman <wickman@apache.org>
Committed: Mon Oct 6 10:49:51 2014 -0700

----------------------------------------------------------------------
 src/main/python/apache/thermos/common/ckpt.py   | 41 ++++++------
 src/main/python/apache/thermos/core/helper.py   | 56 ++++++++++++-----
 src/main/python/apache/thermos/core/process.py  | 41 ++++++------
 src/main/python/apache/thermos/core/runner.py   | 39 ++++++++----
 .../org/apache/thermos/thermos_internal.thrift  |  1 +
 .../python/apache/thermos/core/test_helper.py   | 65 +++++++++++++-------
 .../python/apache/thermos/core/test_process.py  | 11 +++-
 7 files changed, 161 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/aad9a5a1/src/main/python/apache/thermos/common/ckpt.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/common/ckpt.py b/src/main/python/apache/thermos/common/ckpt.py
index 7df179b..e79ec6a 100644
--- a/src/main/python/apache/thermos/common/ckpt.py
+++ b/src/main/python/apache/thermos/common/ckpt.py
@@ -56,9 +56,9 @@ class ProcessStateHandler(object):
 
     () - starting state, [] - terminal state
 
-                             [FAILED]
-                                ^
-                                |
+      .--------------------> [FAILED]
+      |                         ^
+      |                         |
   (WAITING) ----> FORKED ----> RUNNING -----> [KILLED]
                     |          |    |
                     v          |    `---> [SUCCESS]
@@ -139,7 +139,7 @@ def assert_nonempty(state, fields):
     assert getattr(state, field, None) is not None, "Missing field %s from %s!" % (field,
state)
 
 
-def copy_fields(state, state_update, fields):
+def copy_fields(state, state_update, *fields):
   assert_nonempty(state_update, fields)
   for field in fields:
     setattr(state, field, getattr(state_update, field))
@@ -247,51 +247,48 @@ class CheckpointDispatcher(object):
               ProcessState._VALUES_TO_NAMES.get(process_state.state),
               ProcessState._VALUES_TO_NAMES.get(process_state_update.state)))
 
+    # always copy sequence id and state
+    copy_fields(process_state, process_state_update, 'seq')
+
     # CREATION => WAITING
     if process_state_update.state == ProcessState.WAITING:
       assert_process_state_in(None)
-      required_fields = ['seq', 'state', 'process']
-      copy_fields(process_state, process_state_update, required_fields)
+      copy_fields(process_state, process_state_update, 'state', 'process')
 
     # WAITING => FORKED
     elif process_state_update.state == ProcessState.FORKED:
       assert_process_state_in(ProcessState.WAITING)
-      required_fields = ['seq', 'state', 'fork_time', 'coordinator_pid']
-      copy_fields(process_state, process_state_update, required_fields)
+      copy_fields(process_state, process_state_update, 'state', 'fork_time', 'coordinator_pid')
 
     # FORKED => RUNNING
     elif process_state_update.state == ProcessState.RUNNING:
       assert_process_state_in(ProcessState.FORKED)
-      required_fields = ['seq', 'state', 'start_time', 'pid']
-      copy_fields(process_state, process_state_update, required_fields)
+      copy_fields(process_state, process_state_update, 'state', 'start_time', 'pid')
 
     # RUNNING => SUCCESS
     elif process_state_update.state == ProcessState.SUCCESS:
       assert_process_state_in(ProcessState.RUNNING)
-      required_fields = ['seq', 'state', 'stop_time', 'return_code']
-      copy_fields(process_state, process_state_update, required_fields)
+      copy_fields(process_state, process_state_update, 'state', 'stop_time', 'return_code')
 
-    # RUNNING => FAILED
+    # {WAITING, RUNNING} => FAILED
     elif process_state_update.state == ProcessState.FAILED:
-      assert_process_state_in(ProcessState.RUNNING)
-      required_fields = ['seq', 'state', 'stop_time', 'return_code']
-      copy_fields(process_state, process_state_update, required_fields)
+      assert_process_state_in(ProcessState.WAITING, ProcessState.RUNNING)
+      if process_state_update.state == ProcessState.RUNNING:
+        copy_fields(process_state, process_state_update, 'stop_time', 'return_code')
+      copy_fields(process_state, process_state_update, 'state')
 
     # {FORKED, RUNNING} => KILLED
     elif process_state_update.state == ProcessState.KILLED:
       assert_process_state_in(ProcessState.FORKED, ProcessState.RUNNING)
-      required_fields = ['seq', 'state', 'stop_time', 'return_code']
-      copy_fields(process_state, process_state_update, required_fields)
+      copy_fields(process_state, process_state_update, 'state', 'stop_time', 'return_code')
 
     # {FORKED, RUNNING} => LOST
     elif process_state_update.state == ProcessState.LOST:
       assert_process_state_in(ProcessState.FORKED, ProcessState.RUNNING)
-      required_fields = ['seq', 'state']
-      copy_fields(process_state, process_state_update, required_fields)
+      copy_fields(process_state, process_state_update, 'state')
 
     else:
-      raise cls.ErrorRecoveringState(
-        "Unknown state = %s" % process_state_update.state)
+      raise cls.ErrorRecoveringState("Unknown state = %s" % process_state_update.state)
 
   def would_update(self, state, runner_ckpt):
     """

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/aad9a5a1/src/main/python/apache/thermos/core/helper.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/core/helper.py b/src/main/python/apache/thermos/core/helper.py
index 82c68f3..8cd3294 100644
--- a/src/main/python/apache/thermos/core/helper.py
+++ b/src/main/python/apache/thermos/core/helper.py
@@ -86,33 +86,58 @@ class TaskRunnerHelper(object):
     return None
 
   @classmethod
-  def this_is_really_our_pid(cls, process, current_user, start_time):
+  def this_is_really_our_pid(cls, process, uid, user, start_time):
     """
       A heuristic to make sure that this is likely the pid that we own/forked.  Necessary
       because of pid-space wrapping.  We don't want to go and kill processes we don't own,
       especially if the killer is running as root.
 
       process: psutil.Process representing the process to check
-      current_user: user expected to own the process
+      uid: uid expected to own the process (or None if not available)
+      user: username expected to own the process
       start_time: time at which it's expected the process has started
 
       Raises:
         psutil.NoSuchProcess - if the Process supplied no longer exists
     """
-    process_username = process.username()
     process_create_time = process.create_time()
 
-    if process_username != current_user:
-      log.info("Expected pid %s to be ours but the pid user is %s and we're %s" % (
-        process.pid, process_username, current_user))
-      return False
-
     if abs(start_time - process_create_time) >= cls.MAX_START_TIME_DRIFT.as_(Time.SECONDS):
       log.info("Expected pid %s start time to be %s but it's %s" % (
-        process.pid, start_time, process_create_time))
+          process.pid, start_time, process_create_time))
+      return False
+
+    if uid is not None:
+      # If the uid was provided, it is gospel, so do not consider user.
+      try:
+        uids = process.uids()
+        if uids is None:
+          return False
+        process_uid = uids.real
+      except psutil.Error:
+        return False
+
+      if process_uid == uid:
+        return True
+      else:
+        log.info("Expected pid %s to be ours but the pid uid is %s and we're %s" % (
+            process.pid, process_uid, uid))
+        return False
+
+    try:
+      process_user = process.username()
+    except KeyError:
       return False
 
-    return True
+    if process_user == user:
+      # If the uid was not provided, we must use user -- which is possibly flaky if the
+      # user gets deleted from the system, so process_user will be None and we must
+      # return False.
+      log.info("Expected pid %s to be ours but the pid user is %s and we're %s" % (
+          process.pid, process_user, user))
+      return True
+
+    return False
 
   @classmethod
   def scan_process(cls, state, process_name):
@@ -123,14 +148,17 @@ class TaskRunnerHelper(object):
 
     """
     process_run = state.processes[process_name][-1]
-    process_owner = state.header.user
+    user, uid = state.header.user, state.header.uid
 
     coordinator_pid, pid, tree = None, None, set()
 
+    if uid is None:
+      log.debug('Legacy thermos checkpoint stream detected, user = %s' % user)
+
     if process_run.coordinator_pid:
       try:
         coordinator_process = psutil.Process(process_run.coordinator_pid)
-        if cls.this_is_really_our_pid(coordinator_process, process_owner, process_run.fork_time):
+        if cls.this_is_really_our_pid(coordinator_process, uid, user, process_run.fork_time):
           coordinator_pid = process_run.coordinator_pid
       except psutil.NoSuchProcess:
         log.info('  Coordinator %s [pid: %s] completed.' % (process_run.process,
@@ -142,7 +170,7 @@ class TaskRunnerHelper(object):
     if process_run.pid:
       try:
         process = psutil.Process(process_run.pid)
-        if cls.this_is_really_our_pid(process, process_owner, process_run.start_time):
+        if cls.this_is_really_our_pid(process, uid, user, process_run.start_time):
           pid = process.pid
       except psutil.NoSuchProcess:
         log.info('  Process %s [pid: %s] completed.' % (process_run.process, process_run.pid))
@@ -158,7 +186,7 @@ class TaskRunnerHelper(object):
     return (coordinator_pid, pid, tree)
 
   @classmethod
-  def scantree(cls, state):
+  def scan_tree(cls, state):
     """
       Scan the process tree associated with the provided task state.
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/aad9a5a1/src/main/python/apache/thermos/core/process.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/core/process.py b/src/main/python/apache/thermos/core/process.py
index 4889e63..5ce138d 100644
--- a/src/main/python/apache/thermos/core/process.py
+++ b/src/main/python/apache/thermos/core/process.py
@@ -93,10 +93,6 @@ class ProcessBase(object):
     self._stdout = None
     self._stderr = None
     self._user = user
-    if self._user:
-      user, current_user = self._getpwuid()  # may raise self.UnknownUserError
-      if user != current_user and os.geteuid() != 0:
-        raise self.PermissionError('Must be root to run processes as other users!')
     self._ckpt = None
     self._ckpt_head = -1
     if platform is None:
@@ -106,6 +102,19 @@ class ProcessBase(object):
   def _log(self, msg):
     log.debug('[process:%5s=%s]: %s' % (self._pid, self.name(), msg))
 
+  def _getpwuid(self):
+    """Returns a tuple of the user (i.e. --user) and current user."""
+    uid = os.getuid()
+    try:
+      current_user = pwd.getpwuid(uid)
+    except KeyError:
+      raise self.UnknownUserError('Unknown uid %s!' % uid)
+    try:
+      user = pwd.getpwnam(self._user) if self._user is not None else current_user
+    except KeyError:
+      raise self.UnknownUserError('Unable to get pwent information!')
+    return user, current_user
+
   def _ckpt_write(self, msg):
     self._init_ckpt_if_necessary()
     self._log("child state transition [%s] <= %s" % (self.ckpt_file(), msg))
@@ -193,6 +202,9 @@ class ProcessBase(object):
 
   def _prepare_fork(self):
     user, current_user = self._getpwuid()
+    if self._user:
+      if user != current_user and os.geteuid() != 0:
+        raise self.PermissionError('Must be root to run processes as other users!')
     uid, gid = user.pw_uid, user.pw_gid
     self._fork_time = self._platform.clock().time()
     self._setup_ckpt()
@@ -206,18 +218,6 @@ class ProcessBase(object):
     self._ckpt.close()
     self._ckpt = None
 
-  def _getpwuid(self):
-    """Returns a tuple of the user (i.e. --user) and current user."""
-    try:
-      current_user = pwd.getpwuid(os.getuid())
-    except KeyError:
-      raise self.UnknownUserError('Unknown user %s!' % self._user)
-    try:
-      user = pwd.getpwnam(self._user) if self._user else current_user
-    except KeyError:
-      raise self.UnknownUserError('Unable to get pwent information!')
-    return user, current_user
-
   def start(self):
     """
       This is the main call point from the runner, and forks a co-ordinator process to run
the
@@ -226,18 +226,21 @@ class ProcessBase(object):
       The parent returns immediately and populates information about the pid of the co-ordinator.
       The child (co-ordinator) will launch the target process in a subprocess.
     """
-    self._prepare_fork()
+    self._prepare_fork()  # calls _setup_ckpt which can raise CheckpointError
+                          # calls _getpwuid which can raise:
+                          #    UnknownUserError
+                          #    PermissionError
     self._pid = self._platform.fork()
     if self._pid == 0:
       self._pid = self._platform.getpid()
-      self._wait_for_control()
+      self._wait_for_control()  # can raise CheckpointError
       try:
         self.execute()
       finally:
         self._ckpt.close()
         self.finish()
     else:
-      self._finalize_fork()
+      self._finalize_fork()  # can raise CheckpointError
 
   def execute(self):
     raise NotImplementedError

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/aad9a5a1/src/main/python/apache/thermos/core/runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/core/runner.py b/src/main/python/apache/thermos/core/runner.py
index 31f4071..8aac6b5 100644
--- a/src/main/python/apache/thermos/core/runner.py
+++ b/src/main/python/apache/thermos/core/runner.py
@@ -40,6 +40,7 @@ terminal state.
 """
 
 import os
+import pwd
 import socket
 import sys
 import time
@@ -366,8 +367,8 @@ class TaskRunner(object):
     synthesised from an existing task's checkpoint root
   """
   class Error(Exception): pass
-  class InvalidTask(Error): pass
   class InternalError(Error): pass
+  class InvalidTask(Error): pass
   class PermissionError(Error): pass
   class StateError(Error): pass
 
@@ -613,14 +614,22 @@ class TaskRunner(object):
       been constructed.
     """
     if self._state.header is None:
+      try:
+        uid = pwd.getpwnam(self._user).pw_uid
+      except KeyError:
+        # This will cause failures downstream, but they will at least be correctly
+        # reflected in the process state.
+        log.error('Unknown user %s.' % self._user)
+        uid = None
       header = RunnerHeader(
-        task_id=self._task_id,
-        launch_time_ms=int(self._launch_time * 1000),
-        sandbox=self._sandbox,
-        log_dir=self._log_dir,
-        hostname=socket.gethostname(),
-        user=self._user,
-        ports=self._portmap)
+          task_id=self._task_id,
+          launch_time_ms=int(self._launch_time * 1000),
+          sandbox=self._sandbox,
+          log_dir=self._log_dir,
+          hostname=socket.gethostname(),
+          user=self._user,
+          uid=uid,
+          ports=self._portmap)
       runner_ckpt = RunnerCkpt(runner_header=header)
       self._dispatcher.dispatch(self._state, runner_ckpt)
 
@@ -658,7 +667,7 @@ class TaskRunner(object):
       log.debug('_set_process_status(%s <= %s, seq=%s[auto])' % (process_name,
         ProcessState._VALUES_TO_NAMES.get(process_state), sequence_number))
     runner_ckpt = RunnerCkpt(process_status=ProcessStatus(
-      process=process_name, state=process_state, seq=sequence_number, **kw))
+        process=process_name, state=process_state, seq=sequence_number, **kw))
     self._dispatcher.dispatch(self._state, runner_ckpt, self._recovery)
 
   def _task_process_from_process_name(self, process_name, sequence_number):
@@ -776,8 +785,12 @@ class TaskRunner(object):
         self._set_process_status(process_name, ProcessState.WAITING)
         tp = self._task_processes[process_name]
       log.info('Forking Process(%s)' % process_name)
-      tp.start()
-      launched.append(tp)
+      try:
+        tp.start()
+        launched.append(tp)
+      except Process.Error as e:
+        log.error('Failed to launch process: %s' % e)
+        self._set_process_status(process_name, ProcessState.FAILED)
 
     return len(launched) > 0
 
@@ -791,7 +804,7 @@ class TaskRunner(object):
     """
       Returns True if any processes associated with this task have active pids.
     """
-    process_tree = TaskRunnerHelper.scantree(self.state)
+    process_tree = TaskRunnerHelper.scan_tree(self.state)
     return any(any(process_set) for process_set in process_tree.values())
 
   def has_active_processes(self):
@@ -896,7 +909,7 @@ class TaskRunner(object):
     self.kill(force, preemption_wait=Amount(0, Time.SECONDS), terminal_status=TaskState.LOST)
 
   def _kill(self):
-    processes = TaskRunnerHelper.scantree(self._state)
+    processes = TaskRunnerHelper.scan_tree(self._state)
     for process, pid_tuple in processes.items():
       current_run = self._current_process_run(process)
       coordinator_pid, pid, tree = pid_tuple

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/aad9a5a1/src/main/thrift/org/apache/thermos/thermos_internal.thrift
----------------------------------------------------------------------
diff --git a/src/main/thrift/org/apache/thermos/thermos_internal.thrift b/src/main/thrift/org/apache/thermos/thermos_internal.thrift
index 0cea105..2c449a4 100644
--- a/src/main/thrift/org/apache/thermos/thermos_internal.thrift
+++ b/src/main/thrift/org/apache/thermos/thermos_internal.thrift
@@ -82,6 +82,7 @@ struct RunnerHeader {
   7: string log_dir
   4: string hostname        // kill this
   5: string user
+  8: i32    uid             // added as a check in case user disappears
   6: map<string, i64> ports
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/aad9a5a1/src/test/python/apache/thermos/core/test_helper.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/thermos/core/test_helper.py b/src/test/python/apache/thermos/core/test_helper.py
index 53f1e46..fe368ff 100644
--- a/src/test/python/apache/thermos/core/test_helper.py
+++ b/src/test/python/apache/thermos/core/test_helper.py
@@ -23,36 +23,57 @@ from apache.thermos.core.helper import TaskRunnerHelper as TRH
 from gen.apache.thermos.ttypes import ProcessStatus, RunnerHeader, RunnerState
 
 USER1 = 'user1'
+UID = 567
 PID = 12345
 CREATE_TIME = time.time()
 PROCESS_NAME = 'my_process'
 COORDINATOR_PID = 13337
 
 
-def test_this_is_really_our_pid():
-  process = mock.Mock(spec=psutil.Process, pid=PID)
-  process.username.return_value = USER1
+def set_side_effect(mock_obj, value):
+  if isinstance(value, Exception):
+    mock_obj.side_effect = value
+  else:
+    mock_obj.return_value = value
+
+
+def mock_process(pid, username, uid=None):
+  process = mock.Mock(spec=psutil.Process, pid=pid)
+  set_side_effect(process.uids, uid)
+  set_side_effect(process.username, username)
   process.create_time.return_value = CREATE_TIME
-  assert TRH.this_is_really_our_pid(
-      process,
-      process.username(),
-      process.create_time())
-  assert TRH.this_is_really_our_pid(
-      process,
-      process.username(),
+  return process
+
+
+def test_this_is_really_our_pid():
+  process = mock_process(PID, USER1, uid=UID)
+  assert TRH.this_is_really_our_pid(process, UID, USER1, process.create_time())
+  assert TRH.this_is_really_our_pid(process, UID, USER1,
       process.create_time() + TRH.MAX_START_TIME_DRIFT.as_(Time.SECONDS) - 1)
-  assert not TRH.this_is_really_our_pid(
-      process,
-      'user2',
-      process.create_time())
-  assert not TRH.this_is_really_our_pid(
-      process,
-      process.username(),
-      process.create_time() + TRH.MAX_START_TIME_DRIFT.as_(Time.SECONDS) + 1)
-  assert not TRH.this_is_really_our_pid(
-      process,
-      process.username(),
-      process.create_time() - (TRH.MAX_START_TIME_DRIFT.as_(Time.SECONDS) + 1))
+  assert TRH.this_is_really_our_pid(process, UID, 'user2', process.create_time()), (
+      'An equivalent UID is considered the same user.')
+  assert not TRH.this_is_really_our_pid(process, UID + 1, USER1, process.create_time()),
(
+      'UIDs should not match.')
+  assert not TRH.this_is_really_our_pid(process, UID, USER1,
+      process.create_time() + TRH.MAX_START_TIME_DRIFT.as_(Time.SECONDS) + 1), (
+          'Process is outside of start time drift.')
+  assert not TRH.this_is_really_our_pid(process, UID, USER1,
+      process.create_time() - (TRH.MAX_START_TIME_DRIFT.as_(Time.SECONDS) + 1)), (
+          'Process is outside of start time drift.')
+  assert not TRH.this_is_really_our_pid(process, None, 'user2', process.create_time()), (
+      "If no uid is checkpointed but the username is different, we can't know it's ours.")
+
+  process = mock_process(PID, USER1)
+  assert not TRH.this_is_really_our_pid(process, UID, USER1, process.create_time()), (
+      'We cannot validate whether this is our process without a process UID.')
+  assert TRH.this_is_really_our_pid(process, None, USER1, process.create_time()), (
+      'Previous behavior is preserved.')
+
+  process = mock_process(PID, username=KeyError('Unknown user'), uid=UID)
+  assert TRH.this_is_really_our_pid(process, UID, USER1, process.create_time())
+  assert not TRH.this_is_really_our_pid(process, None, USER1, process.create_time())
+  assert TRH.this_is_really_our_pid(process, UID, 'user2', process.create_time()), (
+      'If the user has been renamed but the UID is the same, this is still our process.')
 
 
 TRH_PATH = 'apache.thermos.core.helper.TaskRunnerHelper'

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/aad9a5a1/src/test/python/apache/thermos/core/test_process.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/thermos/core/test_process.py b/src/test/python/apache/thermos/core/test_process.py
index 223393d..e261249 100644
--- a/src/test/python/apache/thermos/core/test_process.py
+++ b/src/test/python/apache/thermos/core/test_process.py
@@ -107,10 +107,15 @@ def test_other_user_fails_nonroot():
   with temporary_dir() as td:
     taskpath = TaskPath(root=td, task_id='task', process='process', run=0)
     sandbox = setup_sandbox(td, taskpath)
-
+    process = TestProcess(
+        'process',
+        'echo hello world',
+        0,
+        taskpath,
+        sandbox,
+        user=get_other_nonroot_user().pw_name)
     with pytest.raises(Process.PermissionError):
-      TestProcess('process', 'echo hello world', 0, taskpath, sandbox,
-            user=get_other_nonroot_user().pw_name)
+      process.start()
 
 
 def test_log_permissions():


Mime
View raw message