aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [10/51] [partial] Rename twitter* and com.twitter to apache and org.apache directories to preserve all file history before the refactor.
Date Tue, 31 Dec 2013 21:20:03 GMT
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/common/path.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/common/path.py b/src/main/python/twitter/thermos/common/path.py
deleted file mode 100644
index 28e07ac..0000000
--- a/src/main/python/twitter/thermos/common/path.py
+++ /dev/null
@@ -1,101 +0,0 @@
-import os
-
-
-class TaskPath(object):
-  """
-    Handle the resolution / detection of the path structure for thermos tasks.
-
-    This is used by the runner to determine where it should be dumping checkpoints and writing
-    stderr/stdout, and by the observer to determine how to detect the running tasks on the system.
-
-    Examples:
-      pathspec = TaskPath(root = "/var/run/thermos")
-                           ^ substitution dictionary for DIR_TEMPLATE
-
-
-                                           which template to acquire
-                                                    v
-      pathspec.given(task_id = "12345-thermos-wickman-23", state='active').getpath("task_path")
-                         ^
-            further substitutions DIR_TEMPLATE
-
-
-    As a detection mechanism:
-      path_glob = pathspec.given(task_id = "*").getpath(task_type)
-      matching_paths = glob.glob(path_glob)
-
-      path_re = pathspec.given(task_id = "(\S+)").getpath(task_type)
-      path_re = re.compile(path_re)
-
-      ids = []
-      for path in matching_paths:
-        matched_blobs = path_re.match(path).groups()
-        ids.append(int(matched_blobs[0]))
-      return ids
-  """
-
-  class UnknownPath(Exception): pass
-  class UnderspecifiedPath(Exception): pass
-
-  DEFAULT_CHECKPOINT_ROOT = "/var/run/thermos"
-  KNOWN_KEYS = [ 'root', 'task_id', 'state', 'process', 'run', 'log_dir' ]
-  LEGACY_KNOWN_KEYS = KNOWN_KEYS[:-1]
-
-  DIR_TEMPLATE = {
-            'task_path': ['%(root)s',       'tasks',   '%(state)s', '%(task_id)s'],
-      'checkpoint_path': ['%(root)s', 'checkpoints', '%(task_id)s'],
-    'runner_checkpoint': ['%(root)s', 'checkpoints', '%(task_id)s', 'runner'],
-   'process_checkpoint': ['%(root)s', 'checkpoints', '%(task_id)s', 'coordinator.%(process)s'],
-      'process_logbase': ['%(log_dir)s'],
-       'process_logdir': ['%(log_dir)s', '%(process)s', '%(run)s']
-  }
-
-  LEGACY_DIR_TEMPLATE = DIR_TEMPLATE.copy()
-  LEGACY_DIR_TEMPLATE.update(
-      process_logbase = ['%(root)s', 'logs', '%(task_id)s'],
-      process_logdir  = ['%(root)s', 'logs', '%(task_id)s', '%(process)s', '%(run)s']
-  )
-
-  def __init__(self, **kw):
-    self._filename = None
-    # initialize with self-interpolating values
-    if kw.get('root') is None:
-      kw['root'] = self.DEFAULT_CHECKPOINT_ROOT
-    # Before log_dir was added explicitly to RunnerHeader, it resolved to %(root)s/logs
-    if kw.get('log_dir'):
-      self._template, keys = self.DIR_TEMPLATE, self.KNOWN_KEYS
-    else:
-      self._template, keys = self.LEGACY_DIR_TEMPLATE, self.LEGACY_KNOWN_KEYS
-    self._data = dict((key, '%%(%s)s' % key) for key in keys)
-    self._data.update(kw)
-
-  def given(self, **kw):
-    """ Perform further interpolation of the templates given the kwargs """
-    eval_dict = dict(self._data) # copy
-    eval_dict.update(kw)
-    tp = TaskPath(**eval_dict)
-    tp._filename = self._filename
-    return tp
-
-  def with_filename(self, filename):
-    """ Return a TaskPath with the specific filename appended to the end of the path """
-    wp = TaskPath(**self._data)
-    wp._filename = filename
-    return wp
-
-  def getpath(self, pathname):
-    if pathname not in self._template:
-      raise self.UnknownPath("Internal error, unknown id: %s" % pathname)
-    path = self._template[pathname][:]
-
-    if self._filename:
-      path += [self._filename]
-    path = os.path.join(*path)
-    interpolated_path = path % self._data
-    try:
-      _ = interpolated_path % {}
-    except KeyError:
-      raise TaskPath.UnderspecifiedPath(
-        "Tried to interpolate path with insufficient variables: %s as %s" % (
-        pathname, interpolated_path))
-    return interpolated_path

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/common/planner.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/common/planner.py b/src/main/python/twitter/thermos/common/planner.py
deleted file mode 100644
index a886dad..0000000
--- a/src/main/python/twitter/thermos/common/planner.py
+++ /dev/null
@@ -1,304 +0,0 @@
-"""Planners to schedule processes within Tasks.
-
-TaskPlanner:
-  - a daemon process can depend upon a regular process
-  - a regular process cannot depend upon a daemon process
-  - a non-ephemeral process cannot depend upon an ephemeral process
-"""
-
-from collections import defaultdict, namedtuple
-import copy
-from functools import partial
-import sys
-import time
-
-class Planner(object):
-  """
-    Given a set of process names and a graph of dependencies between them, determine
-    what can run predicated upon process completions.
-  """
-  class InvalidSchedule(Exception): pass
-
-  @staticmethod
-  def filter_runnable(processes, dependencies):
-    return set(process for process in processes if not dependencies.get(process))
-
-  @staticmethod
-  def filter_dependencies(dependencies, given=frozenset()):
-    """
-      Provided a map of process => list of process :dependencies, and a set of satisfied
-      prior processes in :given, return the new map of dependencies with priors removed.
-    """
-    dependencies = copy.deepcopy(dependencies)
-    for process_set in dependencies.values():
-      process_set -= given
-    return dependencies
-
-  @staticmethod
-  def satisfiable(processes, dependencies):
-    """
-      Given a set of processes and a dependency map, determine if this is a consistent
-      schedule without cycles.
-    """
-    processes = copy.copy(processes)
-    dependencies = copy.deepcopy(dependencies)
-
-    scheduling = True
-    while scheduling:
-      scheduling = False
-      runnables = Planner.filter_runnable(processes, dependencies)
-      if runnables:
-        scheduling = True
-        processes -= runnables
-      dependencies = Planner.filter_dependencies(dependencies, given=runnables)
-    return len(processes) == 0
-
-  def __init__(self, processes, dependencies):
-    self._processes = set(processes)
-    self._dependencies = dict((process, set(dependencies.get(process, [])))
-        for process in self._processes)
-    if not Planner.satisfiable(self._processes, self._dependencies):
-      raise Planner.InvalidSchedule("Cycles detected in the task schedule!")
-    self._running = set()
-    self._finished = set()
-    self._failed = set()
-
-  @property
-  def runnable(self):
-    return Planner.filter_runnable(self._processes - self._running - self._finished - self._failed,
-      Planner.filter_dependencies(self._dependencies, given=self._finished))
-
-  @property
-  def processes(self):
-    return set(self._processes)
-
-  @property
-  def running(self):
-    return set(self._running)
-
-  @property
-  def finished(self):
-    return set(self._finished)
-
-  @property
-  def failed(self):
-    return set(self._failed)
-
-  def reset(self, process):
-    assert process in self._running
-    assert process not in self._finished
-    assert process not in self._failed
-    self._running.discard(process)
-
-  def set_running(self, process):
-    assert process not in self._failed
-    assert process not in self._finished
-    assert process in self._running or process in self.runnable
-    self._running.add(process)
-
-  def set_finished(self, process):
-    assert process in self._running
-    assert process not in self._failed
-    self._running.discard(process)
-    self._finished.add(process)
-
-  def set_failed(self, process):
-    assert process in self._running
-    assert process not in self._finished
-    self._running.discard(process)
-    self._failed.add(process)
-
-  def is_complete(self):
-    return self._finished.union(self._failed) == self._processes
-
-
-TaskAttributes = namedtuple('TaskAttributes', 'min_duration is_daemon max_failures is_ephemeral')
-
-class TaskPlanner(object):
-  """
-    A planner for the processes part of a Thermos task, taking into account ephemeral and daemon
-    bits, in addition to duration restrictions [and failure limits?].
-
-                               is_daemon
-         .----------------------------------------------------.
-         |                                                    |
-         |    clock gate      .----------------------.        |
-         |  .---------------> | runnable && !waiting |        |
-         v  |                 `----------------------'        |
-       .----------.                       |                   |
-       | runnable |                       | set_running       |
-       `----------'                       v                   |
-            ^        forget          .---------.              |  !is_daemon  .----------.
-            `------------------------| running |--------------+------------> | finished |
-            ^                        `---------' add_success                 `----------'
-            |                             |
-            |     under failure limit     | add_failure
-            `-----------------------------+
-                                          | past failure limit
-                                          v
-                                      .--------.
-                                      | failed |
-                                      `--------'
-  """
-  InvalidSchedule = Planner.InvalidSchedule
-  INFINITY = sys.float_info.max
-  TOTAL_RUN_LIMIT = sys.maxsize
-
-  @staticmethod
-  def extract_dependencies(task, process_filter=None):
-    """
-      Construct a set of processes and the process dependencies from a Thermos Task.
-    """
-    process_map = dict((process.name().get(), process)
-                        for process in filter(process_filter, task.processes()))
-    processes = set(process_map)
-    dependencies = defaultdict(set)
-    if task.has_constraints():
-      for constraint in task.constraints():
-        # handle process orders
-        process_names = constraint.order().get()
-        process_name_set = set(process_names)
-        # either all process_names must be in processes or none should be
-        if process_name_set.issubset(processes) == process_name_set.isdisjoint(processes):
-          raise TaskPlanner.InvalidSchedule('Invalid process dependencies!')
-        if not process_name_set.issubset(processes):
-          continue
-        for k in range(1, len(process_names)):
-          pnk, pnk1 = process_names[k], process_names[k-1]
-          if process_map[pnk1].daemon().get():
-            raise TaskPlanner.InvalidSchedule(
-              'Process %s may not depend upon daemon process %s' % (pnk, pnk1))
-          if not process_map[pnk].ephemeral().get() and process_map[pnk1].ephemeral().get():
-            raise TaskPlanner.InvalidSchedule(
-              'Non-ephemeral process %s may not depend upon ephemeral process %s' % (pnk, pnk1))
-          dependencies[pnk].add(pnk1)
-    return (processes, dependencies)
-
-  def __init__(self, task, clock=time, process_filter=None):
-    self._filter = process_filter
-    assert self._filter is None or callable(self._filter), (
-        'TaskPlanner must be given callable process filter.')
-    self._planner = Planner(*self.extract_dependencies(task, self._filter))
-    self._clock = clock
-    self._last_terminal = {} # process => timestamp of last terminal state
-    self._failures = defaultdict(int)
-    self._successes = defaultdict(int)
-    self._attributes = {}
-    self._ephemerals = set(process.name().get() for process in task.processes()
-        if (self._filter is None or self._filter(process)) and process.ephemeral().get())
-
-    for process in filter(self._filter, task.processes()):
-      self._attributes[process.name().get()] = TaskAttributes(
-        is_daemon=bool(process.daemon().get()),
-        is_ephemeral=bool(process.ephemeral().get()),
-        max_failures=process.max_failures().get(),
-        min_duration=process.min_duration().get())
-
-  def get_wait(self, process, timestamp=None):
-    now = timestamp if timestamp is not None else self._clock.time()
-    if process not in self._last_terminal:
-      return 0
-    return self._attributes[process].min_duration - (now - self._last_terminal[process])
-
-  def is_ready(self, process, timestamp=None):
-    return self.get_wait(process, timestamp) <= 0
-
-  def is_waiting(self, process, timestamp=None):
-    return not self.is_ready(process, timestamp)
-
-  @property
-  def runnable(self):
-    """A list of processes that are runnable w/o duration restrictions."""
-    return self.runnable_at(self._clock.time())
-
-  @property
-  def waiting(self):
-    """A list of processes that are waiting w/o duration restrictions."""
-    return self.waiting_at(self._clock.time())
-
-  def runnable_at(self, timestamp):
-    return set(filter(partial(self.is_ready, timestamp=timestamp), self._planner.runnable))
-
-  def waiting_at(self, timestamp):
-    return set(filter(partial(self.is_waiting, timestamp=timestamp), self._planner.runnable))
-
-  def min_wait(self, timestamp=None):
-    """Return the current wait time for the next process to become runnable, 0 if something is ready
-       immediately, or sys.float.max if there are no waiters."""
-    if self.runnable_at(timestamp if timestamp is not None else self._clock.time()):
-      return 0
-    waits = [self.get_wait(waiter, timestamp) for waiter in self.waiting_at(timestamp)]
-    return min(waits) if waits else self.INFINITY
-
-  def set_running(self, process):
-    self._planner.set_running(process)
-
-  def add_failure(self, process, timestamp=None):
-    """Increment the failure count of a process, and reset it to runnable if maximum number of
-    failures has not been reached, or mark it as failed otherwise (ephemeral processes do not
-    count towards the success of a task, and are hence marked finished instead)"""
-    timestamp = timestamp if timestamp is not None else self._clock.time()
-    self._last_terminal[process] = timestamp
-    self._failures[process] += 1
-    self.failure_transition(process)
-
-  def has_reached_run_limit(self, process):
-    return (self._successes[process] + self._failures[process]) >= self.TOTAL_RUN_LIMIT
-
-  def failure_transition(self, process):
-    if self.has_reached_run_limit(process):
-      self._planner.set_failed(process)
-      return
-
-    if self._attributes[process].max_failures == 0 or (
-        self._failures[process] < self._attributes[process].max_failures):
-      self._planner.reset(process)
-    elif self._attributes[process].is_ephemeral:
-      self._planner.set_finished(process)
-    else:
-      self._planner.set_failed(process)
-
-  def add_success(self, process, timestamp=None):
-    """Reset a process to runnable if it is a daemon, or mark it as finished otherwise."""
-    timestamp = timestamp if timestamp is not None else self._clock.time()
-    self._last_terminal[process] = timestamp
-    self._successes[process] += 1
-    self.success_transition(process)
-
-  def success_transition(self, process):
-    if self.has_reached_run_limit(process):
-      self._planner.set_failed(process)
-      return
-
-    if not self._attributes[process].is_daemon:
-      self._planner.set_finished(process)
-    else:
-      self._planner.reset(process)
-
-  def set_failed(self, process):
-    """Force a process to be in failed state.  E.g. kill -9 and you want it pinned failed."""
-    self._planner.set_failed(process)
-
-  def lost(self, process):
-    """Mark a process as lost.  This sets its runnable state back to the previous runnable
-       state and does not increment its failure count."""
-    self._planner.reset(process)
-
-  def is_complete(self):
-    """A task is complete if all ordinary processes are finished or failed (there may still be
-       running ephemeral processes)"""
-    terminals = self._planner.finished.union(self._planner.failed).union(self._ephemerals)
-    return self._planner.processes == terminals
-
-  # TODO(wickman) Should we consider subclassing again?
-  @property
-  def failed(self):
-    return self._planner.failed
-
-  @property
-  def running(self):
-    return self._planner.running
-
-  @property
-  def finished(self):
-    return self._planner.finished

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/config/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/config/BUILD b/src/main/python/twitter/thermos/config/BUILD
deleted file mode 100644
index 57e7f0d..0000000
--- a/src/main/python/twitter/thermos/config/BUILD
+++ /dev/null
@@ -1,24 +0,0 @@
-import os
-
-python_library(
-  name = 'schema',
-  sources = globs('*.py'),
-  dependencies = [
-    pants('aurora/twitterdeps/src/python/twitter/common/lang'),
-    pants('src/main/python/twitter/thermos:pystachio'),
-    pants('src/main/python/twitter/thermos/common:planner')
-  ],
-)
-
-python_library(
-  name = 'config',
-  dependencies = [
-    pants(':schema'),
-    pants('src/main/python/twitter/thermos/common'),  # cover common:planner
-  ],
-  provides = setup_py(
-    name = 'twitter.thermos.config',
-    version = open(os.path.join(get_buildroot(), '.auroraversion')).read().strip().lower(),
-    description = 'Thermos configuration schema and loader.',
-  )
-)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/config/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/config/__init__.py b/src/main/python/twitter/thermos/config/__init__.py
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/config/bin/config_load.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/config/bin/config_load.py b/src/main/python/twitter/thermos/config/bin/config_load.py
deleted file mode 100644
index c0680ec..0000000
--- a/src/main/python/twitter/thermos/config/bin/config_load.py
+++ /dev/null
@@ -1,47 +0,0 @@
-import copy
-import sys
-import json
-import pprint
-
-from twitter.common import app
-from twitter.thermos.config.loader import ThermosConfigLoader
-
-def main(args):
-  """
-    Given .thermos configs, loads them and prints out information about them.
-  """
-
-  if len(args) == 0:
-    app.help()
-
-  for arg in args:
-    print '\nparsing %s\n' % arg
-    tc = ThermosConfigLoader.load(arg)
-
-    for task_wrapper in tc.tasks():
-      task = task_wrapper.task
-      if not task.has_name():
-        print 'Found unnamed task!  Skipping...'
-        continue
-
-      print 'Task: %s [check: %s]' % (task.name(), task.check())
-      if not task.processes():
-        print '  No processes.'
-      else:
-        print '  Processes:'
-        for proc in task.processes():
-          print '    %s' % proc
-
-      ports = task_wrapper.ports()
-      if not ports:
-        print '  No unbound ports.'
-      else:
-        print '  Ports:'
-        for port in ports:
-          print '    %s' % port
-
-      print 'raw:'
-      pprint.pprint(json.loads(task_wrapper.to_json()))
-
-app.set_usage("%s config1 config2 ..." % app.name())
-app.main()

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/config/bin/config_repl.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/config/bin/config_repl.py b/src/main/python/twitter/thermos/config/bin/config_repl.py
deleted file mode 100644
index 4a5c797..0000000
--- a/src/main/python/twitter/thermos/config/bin/config_repl.py
+++ /dev/null
@@ -1,3 +0,0 @@
-from twitter.thermos.config.schema import *
-from code import interact
-interact('Thermos Config REPL', local=locals())

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/config/dsl.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/config/dsl.py b/src/main/python/twitter/thermos/config/dsl.py
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/config/loader.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/config/loader.py b/src/main/python/twitter/thermos/config/loader.py
deleted file mode 100644
index 54721c3..0000000
--- a/src/main/python/twitter/thermos/config/loader.py
+++ /dev/null
@@ -1,179 +0,0 @@
-import copy
-import json
-import os
-import re
-import textwrap
-
-from twitter.common.dirutil import safe_open
-from twitter.common.lang import Compatibility
-from twitter.thermos.common.planner import TaskPlanner
-from twitter.thermos.config.schema import Task
-
-from pystachio import Ref
-from pystachio.config import Config
-
-
-class PortExtractor(object):
-  class InvalidPorts(Exception): pass
-
-  @staticmethod
-  def extract(obj):
-    port_scope = Ref.from_address('thermos.ports')
-    _, uninterp = obj.interpolate()
-    ports = []
-    for ref in uninterp:
-      subscope = port_scope.scoped_to(ref)
-      if subscope is not None:
-        if not subscope.is_index():
-          raise PortExtractor.InvalidPorts(
-            'Bad port specification "%s" (should be of form "thermos.ports[name]"' % ref.address())
-        ports.append(subscope.action().value)
-    return ports
-
-
-class ThermosProcessWrapper(object):
-  # >=1 characters && anything but NULL and '/'
-  VALID_PROCESS_NAME_RE = re.compile(r'^[^./][^/]*$')
-  class InvalidProcess(Exception): pass
-
-  def __init__(self, process):
-    self._process = process
-
-  def ports(self):
-    try:
-      return PortExtractor.extract(self._process)
-    except PortExtractor.InvalidPorts:
-      raise self.InvalidProcess('Process has invalid ports scoping!')
-
-  @staticmethod
-  def assert_valid_process_name(name):
-    if not ThermosProcessWrapper.VALID_PROCESS_NAME_RE.match(name):
-      raise ThermosProcessWrapper.InvalidProcess('Invalid process name: %s' % name)
-
-
-class ThermosTaskWrapper(object):
-  class InvalidTask(Exception): pass
-
-  def __init__(self, task, bindings=None, strict=True):
-    if bindings:
-      task = task.bind(*bindings)
-    if not task.check().ok() and strict:
-      raise ThermosTaskWrapper.InvalidTask(task.check().message())
-    self._task = task
-
-  @property
-  def task(self):
-    return self._task
-
-  def ports(self):
-    ti, _ = self._task.interpolate()
-    ports = set()
-    if ti.has_processes():
-      for process in ti.processes():
-        try:
-          ports.update(ThermosProcessWrapper(process).ports())
-        except ThermosProcessWrapper.InvalidProcess:
-          raise self.InvalidTask('Task has invalid process: %s' % process)
-    return ports
-
-  def to_json(self):
-    return json.dumps(self._task.get())
-
-  def to_file(self, filename):
-    ti, _ = self._task.interpolate()
-    with safe_open(filename, 'w') as fp:
-      json.dump(ti.get(), fp)
-
-  @staticmethod
-  def from_file(filename, **kw):
-    try:
-      with safe_open(filename) as fp:
-        task = Task.json_load(fp)
-      return ThermosTaskWrapper(task, **kw)
-    except Exception as e:
-      return None
-
-
-# TODO(wickman) These should be validators pushed onto ThermosConfigLoader.plugins
-class ThermosTaskValidator(object):
-  class InvalidTaskError(Exception): pass
-
-  @classmethod
-  def assert_valid_task(cls, task):
-    cls.assert_valid_names(task)
-    cls.assert_typecheck(task)
-    cls.assert_valid_plan(task)
-
-  @classmethod
-  def assert_valid_plan(cls, task):
-    try:
-      TaskPlanner(task, process_filter=lambda proc: proc.final().get() == False)
-      TaskPlanner(task, process_filter=lambda proc: proc.final().get() == True)
-    except TaskPlanner.InvalidSchedule as e:
-      raise cls.InvalidTaskError('Task has invalid plan: %s' % e)
-
-  @classmethod
-  def assert_valid_names(cls, task):
-    for process in task.processes():
-      name = process.name().get()
-      try:
-        ThermosProcessWrapper.assert_valid_process_name(name)
-      except ThermosProcessWrapper.InvalidProcess as e:
-        raise cls.InvalidTaskError('Task has invalid process: %s' % e)
-
-  @classmethod
-  def assert_typecheck(cls, task):
-    typecheck = task.check()
-    if not typecheck.ok():
-      raise cls.InvalidTaskError('Failed to fully evaluate task: %s' %
-        typecheck.message())
-
-  @classmethod
-  def assert_valid_ports(cls, task, portmap):
-    for port in ThermosTaskWrapper(task).ports():
-      if port not in portmap:
-        raise cls.InvalidTaskError('Task requires unbound port %s!' % port)
-
-  @classmethod
-  def assert_same_task(cls, spec, task):
-    active_task = spec.given(state='active').getpath('task_path')
-    if os.path.exists(active_task):
-      task_on_disk = ThermosTaskWrapper.from_file(active_task)
-      if not task_on_disk or task_on_disk.task != task:
-        raise cls.InvalidTaskError('Task differs from on disk copy: %r vs %r' % (
-            task_on_disk.task if task_on_disk else None, task))
-
-
-class ThermosConfigLoader(object):
-  SCHEMA = textwrap.dedent("""
-    from pystachio import *
-    from twitter.thermos.config.schema import *
-
-    __TASKS = []
-
-    def export(task):
-      __TASKS.append(Task(task) if isinstance(task, dict) else task)
-  """)
-
-  @classmethod
-  def load(cls, loadable, **kw):
-    config = Config(loadable, schema=cls.SCHEMA)
-    return cls(ThermosTaskWrapper(task, **kw) for task in config.environment['__TASKS'])
-
-  @classmethod
-  def load_json(cls, filename, **kw):
-    tc = cls()
-    task = ThermosTaskWrapper.from_file(filename, **kw)
-    if task:
-      ThermosTaskValidator.assert_valid_task(task.task())
-      tc.add_task(task)
-    return tc
-
-  def __init__(self, exported_tasks=None):
-    self._exported_tasks = exported_tasks or []
-
-  def add_task(self, task):
-    self._exported_tasks.append(task)
-
-  def tasks(self):
-    return self._exported_tasks

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/config/schema.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/config/schema.py b/src/main/python/twitter/thermos/config/schema.py
deleted file mode 100644
index 2ee38a1..0000000
--- a/src/main/python/twitter/thermos/config/schema.py
+++ /dev/null
@@ -1,2 +0,0 @@
-from .schema_base import *
-from .schema_helpers import *

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/config/schema_base.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/config/schema_base.py b/src/main/python/twitter/thermos/config/schema_base.py
deleted file mode 100644
index a47cfdf..0000000
--- a/src/main/python/twitter/thermos/config/schema_base.py
+++ /dev/null
@@ -1,75 +0,0 @@
-from pystachio import (
-  Boolean,
-  Default,
-  Empty,
-  Float,
-  Integer,
-  List,
-  Map,
-  Required,
-  String,
-  Struct
-)
-
-
-# Define constants for resources
-BYTES = 1
-KB = 1024 * BYTES
-MB = 1024 * KB
-GB = 1024 * MB
-TB = 1024 * GB
-
-
-class ThermosContext(Struct):
-  # TODO(wickman) Move the underlying replacement mechanism to %port% replacements
-  ports   = Map(String, Integer)
-
-  # TODO(wickman) Move the underlying replacement mechanism to %task_id%
-  task_id = String
-
-  # TODO(wickman) Move underlying mechanism to %user%
-  user    = String
-
-
-class Resources(Struct):
-  cpu  = Required(Float)
-  ram  = Required(Integer)
-  disk = Required(Integer)
-
-
-class Constraint(Struct):
-  order = List(String)
-
-
-class Process(Struct):
-  cmdline = Required(String)
-  name    = Required(String)
-
-  # This is currently unused but reserved for future use by Thermos.
-  resources     = Resources
-
-  # optionals
-  max_failures  = Default(Integer, 1)      # maximum number of failed process runs
-                                           # before process is failed.
-  daemon        = Default(Boolean, False)
-  ephemeral     = Default(Boolean, False)
-  min_duration  = Default(Integer, 5)      # integer seconds
-  final         = Default(Boolean, False)  # if this process should be a finalizing process
-                                           # that should always be run after regular processes
-
-
-class Task(Struct):
-  name = Default(String, '{{processes[0].name}}')
-  processes = List(Process)
-
-  # optionals
-  constraints = Default(List(Constraint), [])
-  resources = Resources
-  max_failures = Default(Integer, 1)        # maximum number of failed processes before task is failed.
-  max_concurrency = Default(Integer, 0)     # 0 is infinite concurrency.
-                                            # > 0 is max concurrent processes.
-  finalization_wait = Default(Integer, 30)  # the amount of time in seconds we allocate to run the
-                                            # finalization schedule.
-
-  # TODO(jon): remove/replace with proper solution to MESOS-3546
-  user = String

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/config/schema_helpers.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/config/schema_helpers.py b/src/main/python/twitter/thermos/config/schema_helpers.py
deleted file mode 100644
index 502de5a..0000000
--- a/src/main/python/twitter/thermos/config/schema_helpers.py
+++ /dev/null
@@ -1,247 +0,0 @@
-"""Helpers for composing Thermos workflows."""
-import itertools
-
-from twitter.common.lang import Compatibility
-
-from .schema_base import (
-   Constraint,
-   GB,
-   Process,
-   Resources,
-   Task,
-)
-
-from pystachio import Empty, List
-
-
-__all__ = (
-  # shorthand for process ordering constraint
-  'order',
-
-  # task combinators
-  'combine_tasks',    # merge N tasks in parallel
-  'concat_tasks',     # serialize N tasks
-
-  # options helpers
-  'java_options',
-  'python_options',
-
-  # the automatically-sequential version of a task
-  'SequentialTask',
-
-  # create a simple task from a command line + name
-  'SimpleTask',
-
-  # helper classes
-  'Options',
-  'Processes',
-  'Tasks',
-  'Units',
-)
-
-
-class Units(object):
-  """Helpers for base units of Tasks and Processes."""
-
-  @classmethod
-  def optional_resources(cls, resources):
-    return Resources() if resources is Empty else resources
-
-  @classmethod
-  def resources_sum(cls, *resources):
-    """Add two Resources objects together."""
-    def add_unit(f1, f2):
-      return (0 if f1 is Empty else f1.get()) + (0 if f2 is Empty else f2.get())
-
-    def add(r1, r2):
-      return Resources(cpu=add_unit(r1.cpu(), r2.cpu()),
-                       ram=add_unit(r1.ram(), r2.ram()),
-                       disk=add_unit(r1.disk(), r2.disk()))
-
-    return reduce(add, map(cls.optional_resources, resources), Resources(cpu=0, ram=0, disk=0))
-
-  @classmethod
-  def resources_max(cls, resources):
-    """Return a Resource object that is the maximum of the inputs along each
-      resource dimension."""
-    def max_unit(f1, f2):
-      return max(0 if f1 is Empty else f1.get(), 0 if f2 is Empty else f2.get())
-
-    def resource_max(r1, r2):
-      return Resources(cpu=max_unit(r1.cpu(), r2.cpu()),
-                       ram=max_unit(r1.ram(), r2.ram()),
-                       disk=max_unit(r1.disk(), r2.disk()))
-
-    return reduce(resource_max,
-        map(cls.optional_resources, resources), Resources(cpu=0, ram=0, disk=0))
-
-  @classmethod
-  def processes_merge(cls, tasks):
-    """Return a deduped list of the processes from all tasks."""
-    return list(set(itertools.chain.from_iterable(task.processes() for task in tasks)))
-
-  @classmethod
-  def constraints_merge(cls, tasks):
-    """Return a deduped list of the constraints from all tasks."""
-    return list(set(itertools.chain.from_iterable(task.constraints() for task in tasks)))
-
-
-class Processes(object):
-  """Helper class for Process objects."""
-
-  @classmethod
-  def _process_name(cls, process):
-    if isinstance(process, Process):
-      return process.name()
-    elif isinstance(process, Compatibility.string):
-      return process
-    raise ValueError("Unknown value for process order: %s" % repr(process))
-
-  @classmethod
-  def order(cls, *processes):
-    """Given a list of processes, return the list of constraints that keeps them in order, e.g.
-       order(p1, p2, p3) => [Constraint(order=[p1.name(), p2.name(), p3.name()])].
-
-       Similarly, concatenation operations are valid, i.e.
-          order(p1, p2) + order(p2, p3) <=> order(p1, p2, p3)
-    """
-    return [Constraint(order=[cls._process_name(p) for p in processes])]
-
-
-class Tasks(object):
-  """Helper class for Task objects."""
-
-  SIMPLE_CPU  = 1.0
-  SIMPLE_RAM  = 1 * GB
-  SIMPLE_DISK = 1 * GB
-
-  @classmethod
-  def _combine_processes(cls, *tasks):
-    """Given multiple tasks, merge their processes together, retaining the identity of the first
-       task."""
-    if len(tasks) == 0:
-      return Task()
-    head_task = tasks[-1]
-    return head_task(processes=Units.processes_merge(tasks))
-
-  @classmethod
-  def combine(cls, *tasks, **kw):
-    """Given multiple tasks, return a Task that runs all processes in parallel."""
-    if len(tasks) == 0:
-      return Task()
-    base = cls._combine_processes(*tasks)
-    return base(
-      resources=Units.resources_sum(*(task.resources() for task in tasks)),
-      constraints=Units.constraints_merge(tasks),
-      **kw
-    )
-
-  @classmethod
-  def concat(cls, *tasks, **kw):
-    """Given tasks T1...TN, return a single Task that runs all processes such that
-       all processes in Tk run before any process in Tk+1."""
-    if len(tasks) == 0:
-      return Task()
-    base = cls._combine_processes(*tasks)
-    base = base(resources=Units.resources_max(task.resources() for task in tasks))
-    base_constraints = Units.constraints_merge(tasks)
-    # TODO(wickman) be smarter about this in light of existing constraints
-    for (t1, t2) in zip(tasks[0:-1], tasks[1:]):
-      for p1 in t1.processes():
-        for p2 in t2.processes():
-          if p1 != p2:
-            base_constraints.extend(Processes.order(p1, p2))
-    return base(constraints=base_constraints, **kw)
-
-  @classmethod
-  def simple(cls, name, command):
-    """Create a usable Task from a provided name + command line and a default set of resources"""
-    return Task(
-      name=name,
-      processes=[Process(name=name, cmdline=command)],
-      resources=Resources(cpu=cls.SIMPLE_CPU,
-                          ram=cls.SIMPLE_RAM,
-                          disk=cls.SIMPLE_DISK))
-
-  @classmethod
-  def sequential(cls, task):
-    """Add a constraint that makes all processes within a task run sequentially."""
-    def maybe_constrain(task):
-      return {'constraints': order(*task.processes())} if task.processes() is not Empty else {}
-    if task.constraints() is Empty or task.constraints() == List(Constraint)([]):
-      return task(**maybe_constrain(task))
-    raise ValueError('Cannot turn a Task with existing constraints into a SequentialTask!')
-
-
-class Options(object):
-  """Helper class for constructing command-line arguments."""
-
-  @classmethod
-  def render_option(cls, short_prefix, long_prefix, option, value=None):
-    option = '%s%s' % (short_prefix if len(option) == 1 else long_prefix, option)
-    return '%s %s' % (option, value) if value else option
-
-  @classmethod
-  def render_options(cls, short_prefix, long_prefix, *options, **kw_options):
-    renders = []
-
-    for option in options:
-      if isinstance(option, Compatibility.string):
-        renders.append(cls.render_option(short_prefix, long_prefix, option))
-      elif isinstance(option, dict):
-        # preserve order in case option is an OrderedDict, rather than recursing with **option
-        for argument, value in option.items():
-          renders.append(cls.render_option(short_prefix, long_prefix, argument, value))
-      else:
-        raise ValueError('Got an unexpected argument to render_options: %s' % repr(option))
-
-    for argument, value in kw_options.items():
-      renders.append(cls.render_option(short_prefix, long_prefix, argument, value))
-
-    return renders
-
-  @classmethod
-  def java(cls, *options, **kw_options):
-    """
-      Given a set of arguments, keyword arguments or dictionaries, render
-      command-line parameters accordingly.  For example:
-
-        java_options('a', 'b') == '-a -b'
-        java_options({
-          'a': 23,
-          'b': 'foo'
-        }) == '-a 23 -b foo'
-        java_options(a=23, b='foo') == '-a 23 -b foo'
-    """
-    return ' '.join(cls.render_options('-', '-', *options, **kw_options))
-
-  @classmethod
-  def python(cls, *options, **kw_options):
-    """
-      Given a set of arguments, keyword arguments or dictionaries, render
-      command-line parameters accordingly.  Single letter parameters are
-      rendered with single '-'.  For example:
-
-        python_options('a', 'boo') == '-a --boo'
-        python_options({
-          'a': 23,
-          'boo': 'foo'
-        }) == '-a 23 --boo foo'
-        python_options(a=23, boo='foo') == '-a 23 --boo foo'
-    """
-    return ' '.join(cls.render_options('-', '--', *options, **kw_options))
-
-
-def SimpleTask(name, command):
-  """A simple command-line Task with default resources"""
-  return Tasks.simple(name, command)
-
-def SequentialTask(*args, **kw):
-  """A Task whose processes are always sequential."""
-  return Tasks.sequential(Task(*args, **kw))
-
-python_options = Options.python
-java_options = Options.java
-combine_tasks = Tasks.combine
-concat_tasks = Tasks.concat
-order = Processes.order

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/core/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/core/BUILD b/src/main/python/twitter/thermos/core/BUILD
deleted file mode 100644
index d260112..0000000
--- a/src/main/python/twitter/thermos/core/BUILD
+++ /dev/null
@@ -1,88 +0,0 @@
-import os
-
-python_library(
-  name = 'helper',
-  sources = ['helper.py'],
-  dependencies = [
-    pants('src/main/python/twitter/thermos:psutil'),
-    pants('aurora/twitterdeps/src/python/twitter/common/dirutil'),
-    pants('aurora/twitterdeps/src/python/twitter/common/log'),
-    pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
-    pants('aurora/twitterdeps/src/python/twitter/common/recordio:recordio-thrift'),
-    pants('src/main/python/twitter/thermos/common:ckpt'),
-    pants('src/main/python/twitter/thermos/common:path'),
-    pants('src/main/thrift/com/twitter/thermos:py-thrift'),
-  ]
-)
-
-python_library(
-  name = 'inspector',
-  sources = ['inspector.py'],
-  dependencies = [
-    pants(':muxer'),
-    pants('aurora/twitterdeps/src/python/twitter/common/recordio:recordio-thrift'),
-    pants('src/main/python/twitter/thermos/common:ckpt'),
-    pants('src/main/python/twitter/thermos/common:path'),
-    pants('src/main/thrift/com/twitter/thermos:py-thrift'),
-  ]
-)
-
-python_library(
-  name = 'muxer',
-  sources = ['muxer.py'],
-  dependencies = [
-    pants('aurora/twitterdeps/src/python/twitter/common/log'),
-    pants('aurora/twitterdeps/src/python/twitter/common/recordio:recordio-thrift'),
-    pants('src/main/thrift/com/twitter/thermos:py-thrift'),
-  ]
-)
-
-python_library(
-  name = 'process',
-  sources = ['process.py'],
-  dependencies = [
-    pants('aurora/twitterdeps/src/python/twitter/common/dirutil'),
-    pants('aurora/twitterdeps/src/python/twitter/common/lang'),
-    pants('aurora/twitterdeps/src/python/twitter/common/log'),
-    pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
-    pants('aurora/twitterdeps/src/python/twitter/common/recordio:recordio-thrift'),
-    pants('src/main/thrift/com/twitter/thermos:py-thrift'),
-  ]
-)
-
-python_library(
-  name = 'runner',
-  sources = ['__init__.py', 'runner.py'],
-  dependencies = [
-    pants(':helper'),
-    pants(':muxer'),
-    pants(':process'),
-    pants('aurora/twitterdeps/src/python/twitter/common/dirutil'),
-    pants('aurora/twitterdeps/src/python/twitter/common/log'),
-    pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
-    pants('aurora/twitterdeps/src/python/twitter/common/recordio:recordio-thrift'),
-    pants('src/main/python/twitter/thermos:psutil'),
-    pants('src/main/python/twitter/thermos/common:ckpt'),
-    pants('src/main/python/twitter/thermos/common:path'),
-    pants('src/main/python/twitter/thermos/common:planner'),
-    pants('src/main/python/twitter/thermos/config:schema'),
-    pants('src/main/thrift/com/twitter/thermos:py-thrift'),
-  ]
-)
-
-python_library(
-  name = 'core',
-  dependencies = [
-    pants(':inspector'),
-    pants(':runner'),
-
-    # covering libs
-    pants('src/main/python/twitter/thermos/common'),
-    pants('src/main/python/twitter/thermos/config'),
-  ],
-  provides = setup_py(
-    name = 'twitter.thermos.core',
-    version = open(os.path.join(get_buildroot(), '.auroraversion')).read().strip().lower(),
-    description = 'The Thermos core state machine.',
-  )
-)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/core/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/core/__init__.py b/src/main/python/twitter/thermos/core/__init__.py
deleted file mode 100644
index de40ea7..0000000
--- a/src/main/python/twitter/thermos/core/__init__.py
+++ /dev/null
@@ -1 +0,0 @@
-__import__('pkg_resources').declare_namespace(__name__)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/core/helper.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/core/helper.py b/src/main/python/twitter/thermos/core/helper.py
deleted file mode 100644
index 3587a85..0000000
--- a/src/main/python/twitter/thermos/core/helper.py
+++ /dev/null
@@ -1,387 +0,0 @@
-from contextlib import closing
-import errno
-import os
-import signal
-import time
-
-from twitter.common import log
-from twitter.common.dirutil import lock_file, safe_mkdir
-from twitter.common.quantity import Amount, Time
-from twitter.common.recordio import ThriftRecordWriter
-from twitter.thermos.common.ckpt import CheckpointDispatcher
-from twitter.thermos.common.path import TaskPath
-
-from gen.twitter.thermos.ttypes import (
-  ProcessState,
-  ProcessStatus,
-  RunnerCkpt,
-  TaskState,
-  TaskStatus)
-
-import psutil
-
-
-class TaskKiller(object):
-  """
-    Task killing interface.
-  """
-
-  def __init__(self, task_id, checkpoint_root):
-    self._task_id = task_id
-    self._checkpoint_root = checkpoint_root
-
-  def kill(self, force=True):
-    TaskRunnerHelper.kill(self._task_id, self._checkpoint_root, force=force,
-                          terminal_status=TaskState.KILLED)
-
-  def lose(self, force=True):
-    TaskRunnerHelper.kill(self._task_id, self._checkpoint_root, force=force,
-                          terminal_status=TaskState.LOST)
-
-
-class TaskRunnerHelper(object):
-  """
-    TaskRunner helper methods that can be operated directly upon checkpoint
-    state.  These operations do not require knowledge of the underlying
-    task.
-
-    TaskRunnerHelper is sort of a mishmash of "checkpoint-only" operations and
-    the "Process Platform" stuff that started to get pulled into process.py
-
-    This really needs some hard design thought to see if it can be extracted out
-    even further.
-  """
-  class Error(Exception): pass
-  class PermissionError(Error): pass
-
-  # Maximum drift between when the system says a task was forked and when we checkpointed
-  # its fork_time (used as a heuristic to determine a forked task is really ours instead of
-  # a task with coincidentally the same PID but just wrapped around.)
-  MAX_START_TIME_DRIFT = Amount(10, Time.SECONDS)
-
-  @staticmethod
-  def get_actual_user():
-    import getpass, pwd
-    try:
-      pwd_entry = pwd.getpwuid(os.getuid())
-    except KeyError:
-      return getpass.getuser()
-    return pwd_entry[0]
-
-  @staticmethod
-  def process_from_name(task, process_name):
-    if task.has_processes():
-      for process in task.processes():
-        if process.name().get() == process_name:
-          return process
-    return None
-
-  @classmethod
-  def this_is_really_our_pid(cls, process, current_user, start_time):
-    """
-      A heuristic to make sure that this is likely the pid that we own/forked.  Necessary
-      because of pid-space wrapping.  We don't want to go and kill processes we don't own,
-      especially if the killer is running as root.
-
-      process: psutil.Process representing the process to check
-      current_user: user expected to own the process
-      start_time: time at which it's expected the process has started
-
-      Raises:
-        psutil.NoSuchProcess - if the Process supplied no longer exists
-    """
-    if process.username != current_user:
-      log.info("Expected pid %s to be ours but the pid user is %s and we're %s" % (
-        process.pid, process.username, current_user))
-      return False
-
-    if abs(start_time - process.create_time) >= cls.MAX_START_TIME_DRIFT.as_(Time.SECONDS):
-      log.info("Expected pid %s start time to be %s but it's %s" % (
-        process.pid, start_time, process.create_time))
-      return False
-
-    return True
-
-  @classmethod
-  def scan_process(cls, state, process_name):
-    """
-      Given a RunnerState and a process_name, return the following:
-        (coordinator pid, process pid, process tree)
-        (int or None, int or None, set)
-
-    """
-    process_run = state.processes[process_name][-1]
-    process_owner = state.header.user
-
-    coordinator_pid, pid, tree = None, None, set()
-
-    if process_run.coordinator_pid:
-      try:
-        coordinator_process = psutil.Process(process_run.coordinator_pid)
-        if cls.this_is_really_our_pid(coordinator_process, process_owner, process_run.fork_time):
-          coordinator_pid = process_run.coordinator_pid
-      except psutil.NoSuchProcess:
-        log.info('  Coordinator %s [pid: %s] completed.' % (process_run.process,
-            process_run.coordinator_pid))
-      except psutil.Error as err:
-        log.warning('  Error gathering information on pid %s: %s' % (process_run.coordinator_pid,
-            err))
-
-    if process_run.pid:
-      try:
-        process = psutil.Process(process_run.pid)
-        if cls.this_is_really_our_pid(process, process_owner, process_run.start_time):
-          pid = process.pid
-      except psutil.NoSuchProcess:
-        log.info('  Process %s [pid: %s] completed.' % (process_run.process, process_run.pid))
-      except psutil.Error as err:
-        log.warning('  Error gathering information on pid %s: %s' % (process_run.pid, err))
-      else:
-        if pid:
-          try:
-            tree = set(proc.pid for proc in process.get_children(recursive=True))
-          except psutil.Error:
-            log.warning('  Error gathering information on children of pid %s' % pid)
-
-    return (coordinator_pid, pid, tree)
-
-  @classmethod
-  def scantree(cls, state):
-    """
-      Scan the process tree associated with the provided task state.
-
-      Returns a dictionary of process name => (coordinator pid, pid, pid children)
-      If the coordinator is no longer active, coordinator pid will be None.  If the
-      forked process is no longer active, pid will be None and its children will be
-      an empty set.
-    """
-    return dict((process_name, cls.scan_process(state, process_name))
-                for process_name in state.processes)
-
-  @classmethod
-  def safe_signal(cls, pid, sig=signal.SIGTERM):
-    try:
-      os.kill(pid, sig)
-    except OSError as e:
-      if e.errno not in (errno.ESRCH, errno.EPERM):
-        log.error('Unexpected error in os.kill: %s' % e)
-    except Exception as e:
-      log.error('Unexpected error in os.kill: %s' % e)
-
-  @classmethod
-  def terminate_pid(cls, pid):
-    cls.safe_signal(pid, signal.SIGTERM)
-
-  @classmethod
-  def kill_pid(cls, pid):
-    cls.safe_signal(pid, signal.SIGKILL)
-
-  @classmethod
-  def kill_group(cls, pgrp):
-    cls.safe_signal(-pgrp, signal.SIGKILL)
-
-  @classmethod
-  def _get_process_tuple(cls, state, process_name):
-    assert process_name in state.processes and len(state.processes[process_name]) > 0
-    return cls.scan_process(state, process_name)
-
-  @classmethod
-  def _get_coordinator_group(cls, state, process_name):
-    assert process_name in state.processes and len(state.processes[process_name]) > 0
-    return state.processes[process_name][-1].coordinator_pid
-
-  @classmethod
-  def terminate_process(cls, state, process_name):
-    log.debug('TaskRunnerHelper.terminate_process(%s)' % process_name)
-    _, pid, _ = cls._get_process_tuple(state, process_name)
-    if pid:
-      log.debug('   => SIGTERM pid %s' % pid)
-      cls.terminate_pid(pid)
-    return bool(pid)
-
-  @classmethod
-  def kill_process(cls, state, process_name):
-    log.debug('TaskRunnerHelper.kill_process(%s)' % process_name)
-    coordinator_pgid = cls._get_coordinator_group(state, process_name)
-    coordinator_pid, pid, tree = cls._get_process_tuple(state, process_name)
-    # This is super dangerous.  TODO(wickman)  Add a heuristic that determines
-    # that 1) there are processes that currently belong to this process group
-    #  and 2) those processes have inherited the coordinator checkpoint filehandle
-    # This way we validate that it is in fact the process group we expect.
-    if coordinator_pgid:
-      log.debug('   => SIGKILL coordinator group %s' % coordinator_pgid)
-      cls.kill_group(coordinator_pgid)
-    if coordinator_pid:
-      log.debug('   => SIGKILL coordinator %s' % coordinator_pid)
-      cls.kill_pid(coordinator_pid)
-    if pid:
-      log.debug('   => SIGKILL pid %s' % pid)
-      cls.kill_pid(pid)
-    for child in tree:
-      log.debug('   => SIGKILL child %s' % child)
-      cls.kill_pid(child)
-    return bool(coordinator_pid or pid or tree)
-
-  @classmethod
-  def kill_runner(cls, state):
-    log.debug('TaskRunnerHelper.kill_runner()')
-    if not state or not state.statuses:
-      raise cls.Error('Could not read state!')
-    pid = state.statuses[-1].runner_pid
-    if pid == os.getpid():
-      raise cls.Error('Unwilling to commit seppuku.')
-    try:
-      os.kill(pid, signal.SIGKILL)
-      return True
-    except OSError as e:
-      if e.errno == errno.EPERM:
-        # Permission denied
-        return False
-      elif e.errno == errno.ESRCH:
-        # pid no longer exists
-        return True
-      raise
-
-  @classmethod
-  def open_checkpoint(cls, filename, force=False, state=None):
-    """
-      Acquire a locked checkpoint stream.
-    """
-    safe_mkdir(os.path.dirname(filename))
-    fp = lock_file(filename, "a+")
-    if fp in (None, False):
-      if force:
-        log.info('Found existing runner, forcing leadership forfeit.')
-        state = state or CheckpointDispatcher.from_file(filename)
-        if cls.kill_runner(state):
-          log.info('Successfully killed leader.')
-          # TODO(wickman)  Blocking may not be the best idea here.  Perhaps block up to
-          # a maximum timeout.  But blocking is necessary because os.kill does not immediately
-          # release the lock if we're in force mode.
-          fp = lock_file(filename, "a+", blocking=True)
-      else:
-        log.error('Found existing runner, cannot take control.')
-    if fp in (None, False):
-      raise cls.PermissionError('Could not open locked checkpoint: %s, lock_file = %s' %
-        (filename, fp))
-    ckpt = ThriftRecordWriter(fp)
-    ckpt.set_sync(True)
-    return ckpt
-
-  @classmethod
-  def kill(cls, task_id, checkpoint_root, force=False,
-           terminal_status=TaskState.KILLED, clock=time):
-    """
-      An implementation of Task killing that doesn't require a fully hydrated TaskRunner object.
-      Terminal status must be either KILLED or LOST state.
-    """
-    if terminal_status not in (TaskState.KILLED, TaskState.LOST):
-      raise cls.Error('terminal_status must be KILLED or LOST (got %s)' %
-                      TaskState._VALUES_TO_NAMES.get(terminal_status) or terminal_status)
-    pathspec = TaskPath(root=checkpoint_root, task_id=task_id)
-    checkpoint = pathspec.getpath('runner_checkpoint')
-    state = CheckpointDispatcher.from_file(checkpoint)
-
-    if state is None or state.header is None or state.statuses is None:
-      if force:
-        log.error('Task has uninitialized TaskState - forcibly finalizing')
-        cls.finalize_task(pathspec)
-        return
-      else:
-        log.error('Cannot update states in uninitialized TaskState!')
-        return
-
-    ckpt = cls.open_checkpoint(checkpoint, force=force, state=state)
-
-    def write_task_state(state):
-      update = TaskStatus(state=state, timestamp_ms=int(clock.time() * 1000),
-                          runner_pid=os.getpid(), runner_uid=os.getuid())
-      ckpt.write(RunnerCkpt(task_status=update))
-
-    def write_process_status(status):
-      ckpt.write(RunnerCkpt(process_status=status))
-
-    if cls.is_task_terminal(state.statuses[-1].state):
-      log.info('Task is already in terminal state!  Finalizing.')
-      cls.finalize_task(pathspec)
-      return
-
-    with closing(ckpt):
-      write_task_state(TaskState.ACTIVE)
-      for process, history in state.processes.items():
-        process_status = history[-1]
-        if not cls.is_process_terminal(process_status.state):
-          if cls.kill_process(state, process):
-            write_process_status(ProcessStatus(process=process,
-              state=ProcessState.KILLED, seq=process_status.seq + 1, return_code=-9,
-              stop_time=clock.time()))
-          else:
-            if process_status.state is not ProcessState.WAITING:
-              write_process_status(ProcessStatus(process=process,
-                state=ProcessState.LOST, seq=process_status.seq + 1))
-      write_task_state(terminal_status)
-    cls.finalize_task(pathspec)
-
-  @classmethod
-  def reap_children(cls):
-    pids = set()
-
-    while True:
-      try:
-        pid, status, rusage = os.wait3(os.WNOHANG)
-        if pid == 0:
-          break
-        pids.add(pid)
-        log.debug('Detected terminated process: pid=%s, status=%s, rusage=%s' % (
-          pid, status, rusage))
-      except OSError as e:
-        if e.errno != errno.ECHILD:
-          log.warning('Unexpected error when calling waitpid: %s' % e)
-        break
-
-    return pids
-
-  TERMINAL_PROCESS_STATES = frozenset([
-    ProcessState.SUCCESS,
-    ProcessState.KILLED,
-    ProcessState.FAILED,
-    ProcessState.LOST])
-
-  TERMINAL_TASK_STATES = frozenset([
-    TaskState.SUCCESS,
-    TaskState.FAILED,
-    TaskState.KILLED,
-    TaskState.LOST])
-
-  @classmethod
-  def is_process_terminal(cls, process_status):
-    return process_status in cls.TERMINAL_PROCESS_STATES
-
-  @classmethod
-  def is_task_terminal(cls, task_status):
-    return task_status in cls.TERMINAL_TASK_STATES
-
-  @classmethod
-  def initialize_task(cls, spec, task):
-    active_task = spec.given(state='active').getpath('task_path')
-    finished_task = spec.given(state='finished').getpath('task_path')
-    is_active, is_finished = os.path.exists(active_task), os.path.exists(finished_task)
-    if is_finished:
-      raise cls.Error('Cannot initialize task with "finished" record!')
-    if not is_active:
-      safe_mkdir(os.path.dirname(active_task))
-      with open(active_task, 'w') as fp:
-        fp.write(task)
-
-  @classmethod
-  def finalize_task(cls, spec):
-    active_task = spec.given(state='active').getpath('task_path')
-    finished_task = spec.given(state='finished').getpath('task_path')
-    is_active, is_finished = os.path.exists(active_task), os.path.exists(finished_task)
-    if not is_active:
-      raise cls.Error('Cannot finalize task with no "active" record!')
-    elif is_finished:
-      raise cls.Error('Cannot finalize task with "finished" record!')
-    safe_mkdir(os.path.dirname(finished_task))
-    os.rename(active_task, finished_task)
-    os.utime(finished_task, None)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/core/inspector.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/core/inspector.py b/src/main/python/twitter/thermos/core/inspector.py
deleted file mode 100644
index 8c646a1..0000000
--- a/src/main/python/twitter/thermos/core/inspector.py
+++ /dev/null
@@ -1,105 +0,0 @@
-from collections import namedtuple
-from contextlib import closing
-import pwd
-
-from twitter.common import log
-from twitter.common.recordio import RecordIO, ThriftRecordReader
-
-from twitter.thermos.common.ckpt import CheckpointDispatcher
-from twitter.thermos.common.path import TaskPath
-
-from gen.twitter.thermos.ttypes import (
-  ProcessState,
-  RunnerCkpt,
-  RunnerState)
-
-from .muxer import ProcessMuxer
-
-
-CheckpointInspection = namedtuple('CheckpointInspection',
-    ['runner_latest_update',
-     'process_latest_update',
-     'runner_processes',
-     'coordinator_processes',
-     'processes'])
-
-
-class CheckpointInspector(object):
-  def __init__(self, checkpoint_root):
-    self._path = TaskPath(root=checkpoint_root)
-
-  @staticmethod
-  def get_timestamp(process_record):
-    if process_record :
-      for timestamp in ('fork_time', 'start_time', 'stop_time'):
-        stamp = getattr(process_record, timestamp, None)
-        if stamp:
-          return stamp
-    return 0
-
-  def inspect(self, task_id):
-    """
-      Reconstructs the checkpoint stream and returns a CheckpointInspection.
-    """
-    dispatcher = CheckpointDispatcher()
-    state = RunnerState(processes = {})
-    muxer = ProcessMuxer(self._path.given(task_id=task_id))
-
-    runner_processes = []
-    coordinator_processes = set()
-    processes = set()
-
-    def consume_process_record(record):
-      if not record.process_status:
-        return
-      try:
-        user_uid = pwd.getpwnam(state.header.user).pw_uid
-      except KeyError:
-        log.error('Could not find user: %s' % state.header.user)
-        return
-      if record.process_status.state == ProcessState.FORKED:
-        coordinator_processes.add((record.process_status.coordinator_pid, user_uid,
-                                   record.process_status.fork_time))
-      elif record.process_status.state == ProcessState.RUNNING:
-        processes.add((record.process_status.pid, user_uid,
-                       record.process_status.start_time))
-
-    # replay runner checkpoint
-    runner_pid = None
-    runner_latest_update = 0
-    try:
-      with open(self._path.given(task_id=task_id).getpath('runner_checkpoint')) as fp:
-        with closing(ThriftRecordReader(fp, RunnerCkpt)) as ckpt:
-          for record in ckpt:
-            dispatcher.dispatch(state, record)
-            runner_latest_update = max(runner_latest_update,
-                self.get_timestamp(record.process_status))
-            # collect all bound runners
-            if record.task_status:
-              if record.task_status.runner_pid != runner_pid:
-                runner_processes.append((record.task_status.runner_pid,
-                                         record.task_status.runner_uid or 0,
-                                         record.task_status.timestamp_ms))
-                runner_pid = record.task_status.runner_pid
-            elif record.process_status:
-              consume_process_record(record)
-    except (IOError, OSError, RecordIO.Error) as err:
-      log.debug('Error inspecting task runner checkpoint: %s' % err)
-      return
-
-    # register existing processes in muxer
-    for process_name in state.processes:
-      muxer.register(process_name)
-
-    # read process checkpoints
-    process_latest_update = runner_latest_update
-    for record in muxer.select():
-      process_latest_update = max(process_latest_update, self.get_timestamp(record.process_status))
-      consume_process_record(record)
-
-    return CheckpointInspection(
-      runner_latest_update=runner_latest_update,
-      process_latest_update=process_latest_update,
-      runner_processes=runner_processes,
-      coordinator_processes=coordinator_processes,
-      processes=processes)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/core/muxer.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/core/muxer.py b/src/main/python/twitter/thermos/core/muxer.py
deleted file mode 100644
index 9c0b389..0000000
--- a/src/main/python/twitter/thermos/core/muxer.py
+++ /dev/null
@@ -1,142 +0,0 @@
-import os
-import errno
-
-from twitter.common import log
-from twitter.common.recordio import ThriftRecordReader
-from gen.twitter.thermos.ttypes import RunnerCkpt
-
-
-class ProcessMuxer(object):
-  class ProcessExists(Exception): pass
-  class ProcessNotFound(Exception): pass
-  class CorruptCheckpoint(Exception): pass
-
-  def __init__(self, pathspec):
-    self._processes = {} # process_name => fp
-    self._watermarks = {} # process_name => sequence high watermark
-    self._pathspec = pathspec
-
-  def __del__(self):
-    for fp in filter(None, self._processes.values()):
-      fp.close()
-
-  def register(self, process_name, watermark=0):
-    log.debug('registering %s' % process_name)
-    if process_name in self._processes:
-      raise ProcessMuxer.ProcessExists("Process %s is already registered" % process_name)
-    self._processes[process_name] = None
-    self._watermarks[process_name] = watermark
-
-  def _bind_processes(self):
-    for process_name, fp in self._processes.items():
-      if fp is None:
-        process_ckpt = self._pathspec.given(process=process_name).getpath('process_checkpoint')
-        log.debug('ProcessMuxer binding %s => %s' % (process_name, process_ckpt))
-        try:
-          self._processes[process_name] = open(process_ckpt, 'r')
-        except IOError as e:
-          if e.errno == errno.ENOENT:
-            log.debug('  => bind failed, checkpoint not available yet.')
-            continue
-          else:
-            log.error("Unexpected inability to open %s! %s" % (process_ckpt, e))
-        except Exception as e:
-          log.error("Unexpected inability to open %s! %s" % (process_ckpt, e))
-        self._fast_forward_stream(process_name)
-
-  def _fast_forward_stream(self, process_name):
-    log.debug('Fast forwarding %s stream to seq=%s' % (process_name,
-      self._watermarks[process_name]))
-    assert self._processes.get(process_name) is not None
-    fp = self._processes[process_name]
-    rr = ThriftRecordReader(fp, RunnerCkpt)
-    current_watermark = -1
-    records = 0
-    while current_watermark < self._watermarks[process_name]:
-      last_pos = fp.tell()
-      record = rr.try_read()
-      if record is None:
-        break
-      new_watermark = record.process_status.seq
-      if new_watermark > self._watermarks[process_name]:
-        log.debug('Over-seeked %s [watermark = %s, high watermark = %s], rewinding.' % (
-          process_name, new_watermark, self._watermarks[process_name]))
-        fp.seek(last_pos)
-        break
-      current_watermark = new_watermark
-      records += 1
-
-    if current_watermark < self._watermarks[process_name]:
-      log.warning('Only able to fast forward to %s@sequence=%s, high watermark is %s' % (
-         process_name, current_watermark, self._watermarks[process_name]))
-
-    if records:
-      log.debug('Fast forwarded %s %s record(s) to seq=%s.' % (process_name, records,
-        current_watermark))
-
-  def unregister(self, process_name):
-    log.debug('unregistering %s' % process_name)
-    if process_name not in self._processes:
-      raise ProcessMuxer.ProcessNotFound("No trace of process: %s" % process_name)
-    else:
-      self._watermarks.pop(process_name)
-      fp = self._processes.pop(process_name)
-      if fp is not None:
-        fp.close()
-
-  def has_data(self, process):
-    """
-      Return true if we think that there are updates available from the supplied process.
-    """
-    self._bind_processes()
-    # TODO(wickman) Should this raise ProcessNotFound?
-    if process not in self._processes:
-      return False
-    fp = self._processes[process]
-    rr = ThriftRecordReader(fp, RunnerCkpt)
-    old_pos = fp.tell()
-    try:
-      expected_new_pos = os.fstat(fp.fileno()).st_size
-    except OSError as e:
-      log.debug('ProcessMuxer could not fstat for process %s' % process)
-      return False
-    update = rr.try_read()
-    if update:
-      fp.seek(old_pos)
-      return True
-    return False
-
-  def select(self):
-    """
-      Read and multiplex checkpoint records from all the forked off process coordinators.
-
-      Checkpoint records can come from one of two places:
-        in-process: checkpoint records synthesized for FORKED and LOST events
-        out-of-process: checkpoint records from from file descriptors of forked coordinators
-
-      Returns a list of RunnerCkpt objects that were successfully read, or an empty
-      list if none were read.
-    """
-    self._bind_processes()
-    updates = []
-    for handle in filter(None, self._processes.values()):
-      try:
-        fstat = os.fstat(handle.fileno())
-      except OSError as e:
-        log.error('Unable to fstat %s!' % handle.name)
-        continue
-      if handle.tell() > fstat.st_size:
-        log.error('Truncated checkpoint record detected on %s!' % handle.name)
-      elif handle.tell() < fstat.st_size:
-        rr = ThriftRecordReader(handle, RunnerCkpt)
-        while True:
-          process_update = rr.try_read()
-          if process_update:
-            updates.append(process_update)
-          else:
-            break
-    if len(updates) > 0:
-      log.debug('select() returning %s updates:' % len(updates))
-      for update in updates:
-        log.debug('  = %s' % update)
-    return updates

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/core/process.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/core/process.py b/src/main/python/twitter/thermos/core/process.py
deleted file mode 100644
index dd1d62a..0000000
--- a/src/main/python/twitter/thermos/core/process.py
+++ /dev/null
@@ -1,371 +0,0 @@
-"""Run processes of a Thermos task.
-
-This module contains the Process class, used to manage the execution of the constituent processes of
-a Thermos task. Each process is represented by a "coordinator" process, which fires off the actual
-commandline in a subprocess of its own.
-
-"""
-
-from abc import abstractmethod
-import getpass
-import grp
-import os
-import pwd
-import signal
-import subprocess
-import sys
-import time
-
-from twitter.common import log
-from twitter.common.dirutil import (
-    lock_file,
-    safe_mkdir,
-    safe_open,
-)
-from twitter.common.lang import Interface
-from twitter.common.quantity import Amount, Time
-from twitter.common.recordio import ThriftRecordReader, ThriftRecordWriter
-
-from gen.twitter.thermos.ttypes import (
-    ProcessState,
-    ProcessStatus,
-    RunnerCkpt,
-)
-
-
-class Platform(Interface):
-  """Abstract representation of a platform encapsulating system-level functions"""
-  @abstractmethod
-  def clock(self):
-    pass
-
-  @abstractmethod
-  def fork(self):
-    pass
-
-  @abstractmethod
-  def getpid(self):
-    pass
-
-
-class ProcessBase(object):
-  """
-    Encapsulate a running process for a task.
-  """
-  class Error(Exception): pass
-  class UnknownUserError(Error): pass
-  class CheckpointError(Error): pass
-  class UnspecifiedSandbox(Error): pass
-  class PermissionError(Error): pass
-
-  CONTROL_WAIT_CHECK_INTERVAL = Amount(100, Time.MILLISECONDS)
-  MAXIMUM_CONTROL_WAIT = Amount(1, Time.MINUTES)
-
-  def __init__(self, name, cmdline, sequence, pathspec, sandbox_dir, user=None, platform=None):
-    """
-      required:
-        name        = name of the process
-        cmdline     = cmdline of the process
-        sequence    = the next available sequence number for state updates
-        pathspec    = TaskPath object for synthesizing path names
-        sandbox_dir = the sandbox in which to run the process
-        platform    = Platform providing fork, clock, getpid
-
-      optional:
-        user        = the user to run as (if unspecified, will default to current user.)
-                      if specified to a user that is not the current user, you must have root access
-    """
-    self._name = name
-    self._cmdline = cmdline
-    self._pathspec = pathspec
-    self._seq = sequence
-    self._sandbox = sandbox_dir
-    if self._sandbox:
-      safe_mkdir(self._sandbox)
-    self._pid = None
-    self._fork_time = None
-    self._stdout = None
-    self._stderr = None
-    self._user = user
-    if self._user:
-      user, current_user = self._getpwuid() # may raise self.UnknownUserError
-      if user != current_user and os.geteuid() != 0:
-        raise self.PermissionError('Must be root to run processes as other users!')
-    self._ckpt = None
-    self._ckpt_head = -1
-    if platform is None:
-      raise ValueError("Platform must be specified")
-    self._platform = platform
-
-  def _log(self, msg):
-    log.debug('[process:%5s=%s]: %s' % (self._pid, self.name(), msg))
-
-  def _ckpt_write(self, msg):
-    self._init_ckpt_if_necessary()
-    self._log("child state transition [%s] <= %s" % (self.ckpt_file(), msg))
-    self._ckpt.write(msg)
-
-  def _write_process_update(self, **kw):
-    """Write a process update to the coordinator's checkpoint stream."""
-    process_status = ProcessStatus(**kw)
-    process_status.seq = self._seq
-    process_status.process = self.name()
-    self._ckpt_write(RunnerCkpt(process_status=process_status))
-    self._seq += 1
-
-  def _write_initial_update(self):
-    self._write_process_update(state=ProcessState.FORKED,
-                               fork_time=self._fork_time,
-                               coordinator_pid=self._pid)
-
-  def cmdline(self):
-    return self._cmdline
-
-  def name(self):
-    return self._name
-
-  def pid(self):
-    """pid of the coordinator"""
-    return self._pid
-
-  def rebind(self, pid, fork_time):
-    """rebind Process to an existing coordinator pid without forking"""
-    self._pid = pid
-    self._fork_time = fork_time
-
-  def ckpt_file(self):
-    return self._pathspec.getpath('process_checkpoint')
-
-  def _setup_ckpt(self):
-    """Set up the checkpoint: must be run on the parent."""
-    self._log('initializing checkpoint file: %s' % self.ckpt_file())
-    ckpt_fp = lock_file(self.ckpt_file(), "a+")
-    if ckpt_fp in (None, False):
-      raise self.CheckpointError('Could not acquire checkpoint permission or lock for %s!' %
-        self.ckpt_file())
-    self._ckpt_head = os.path.getsize(self.ckpt_file())
-    ckpt_fp.seek(self._ckpt_head)
-    self._ckpt = ThriftRecordWriter(ckpt_fp)
-    self._ckpt.set_sync(True)
-
-  def _init_ckpt_if_necessary(self):
-    if self._ckpt is None:
-      self._setup_ckpt()
-
-  def _wait_for_control(self):
-    """Wait for control of the checkpoint stream: must be run in the child."""
-    total_wait_time = Amount(0, Time.SECONDS)
-
-    with open(self.ckpt_file(), 'r') as fp:
-      fp.seek(self._ckpt_head)
-      rr = ThriftRecordReader(fp, RunnerCkpt)
-      while total_wait_time < self.MAXIMUM_CONTROL_WAIT:
-        ckpt_tail = os.path.getsize(self.ckpt_file())
-        if ckpt_tail == self._ckpt_head:
-          self._platform.clock().sleep(self.CONTROL_WAIT_CHECK_INTERVAL.as_(Time.SECONDS))
-          total_wait_time += self.CONTROL_WAIT_CHECK_INTERVAL
-          continue
-        checkpoint = rr.try_read()
-        if checkpoint:
-          if not checkpoint.process_status:
-            raise self.CheckpointError('No process status in checkpoint!')
-          if (checkpoint.process_status.process != self.name() or
-              checkpoint.process_status.state != ProcessState.FORKED or
-              checkpoint.process_status.fork_time != self._fork_time or
-              checkpoint.process_status.coordinator_pid != self._pid):
-            self._log('Losing control of the checkpoint stream:')
-            self._log('   fork_time [%s] vs self._fork_time [%s]' % (
-                checkpoint.process_status.fork_time, self._fork_time))
-            self._log('   coordinator_pid [%s] vs self._pid [%s]' % (
-                checkpoint.process_status.coordinator_pid, self._pid))
-            raise self.CheckpointError('Lost control of the checkpoint stream!')
-          self._log('Taking control of the checkpoint stream at record: %s' %
-            checkpoint.process_status)
-          self._seq = checkpoint.process_status.seq + 1
-          return True
-    raise self.CheckpointError('Timed out waiting for checkpoint stream!')
-
-  def _prepare_fork(self):
-    user, current_user = self._getpwuid()
-    uid, gid = user.pw_uid, user.pw_gid
-    self._fork_time = self._platform.clock().time()
-    self._setup_ckpt()
-    self._stdout = safe_open(self._pathspec.with_filename('stdout').getpath('process_logdir'), "w")
-    self._stderr = safe_open(self._pathspec.with_filename('stderr').getpath('process_logdir'), "w")
-    os.chown(self._stdout.name, user.pw_uid, user.pw_gid)
-    os.chown(self._stderr.name, user.pw_uid, user.pw_gid)
-
-  def _finalize_fork(self):
-    self._write_initial_update()
-    self._ckpt.close()
-    self._ckpt = None
-
-  def _getpwuid(self):
-    """Returns a tuple of the user (i.e. --user) and current user."""
-    try:
-      current_user = pwd.getpwuid(os.getuid())
-    except KeyError:
-      raise self.UnknownUserError('Unknown user %s!' % self._user)
-    try:
-      user = pwd.getpwnam(self._user) if self._user else current_user
-    except KeyError:
-      raise self.UnknownUserError('Unable to get pwent information!')
-    return user, current_user
-
-  def start(self):
-    """
-      This is the main call point from the runner, and forks a co-ordinator process to run the
-      target process (i.e. self.cmdline())
-
-      The parent returns immediately and populates information about the pid of the co-ordinator.
-      The child (co-ordinator) will launch the target process in a subprocess.
-    """
-    self._prepare_fork()
-    self._pid = self._platform.fork()
-    if self._pid == 0:
-      self._pid = self._platform.getpid()
-      self._wait_for_control()
-      try:
-        self.execute()
-      finally:
-        self._ckpt.close()
-        self.finish()
-    else:
-      self._finalize_fork()
-
-  def execute(self):
-    raise NotImplementedError
-
-  def finish(self):
-    pass
-
-
-class RealPlatform(Platform):
-  IGNORE_SIGNALS = (signal.SIGINT,)
-
-  def __init__(self, fork=os.fork):
-    self._fork = fork
-
-  def fork(self):
-    pid = self._fork()
-    if pid == 0:
-      self._sanitize()
-    return pid
-
-  def _sanitize(self):
-    for sig in self.IGNORE_SIGNALS:
-      signal.signal(sig, signal.SIG_IGN)
-
-  def getpid(self):
-    return os.getpid()
-
-  def clock(self):
-    return time
-
-
-class Process(ProcessBase):
-  """
-    Encapsulate a running process for a task.
-  """
-  RCFILE = '.thermos_profile'
-  FD_CLOEXEC = True
-
-  def __init__(self, *args, **kw):
-    """
-      See ProcessBase.__init__
-
-      Takes additional arguments:
-        fork: the fork function to use [default: os.fork]
-        chroot: whether or not to chroot into the sandbox [default: False]
-    """
-    fork = kw.pop('fork', os.fork)
-    self._use_chroot = bool(kw.pop('chroot', False))
-    self._rc = None
-    kw['platform'] = RealPlatform(fork=fork)
-    ProcessBase.__init__(self, *args, **kw)
-    if self._use_chroot and self._sandbox is None:
-      raise self.UnspecifiedSandbox('If using chroot, must specify sandbox!')
-
-  def _chroot(self):
-    """chdir and chroot to the sandbox directory."""
-    os.chdir(self._sandbox)
-    os.chroot(self._sandbox)
-
-  def _setuid(self):
-    """Drop privileges to the user supplied in Process creation (if necessary.)"""
-    user, current_user = self._getpwuid()
-    if user.pw_uid == current_user.pw_uid:
-      return
-
-    uid, gid = user.pw_uid, user.pw_gid
-    username = user.pw_name
-    group_ids = [group.gr_gid for group in grp.getgrall() if username in group.gr_mem]
-    os.setgroups(group_ids)
-    os.setgid(gid)
-    os.setuid(uid)
-
-  def execute(self):
-    """Perform final initialization and launch target process commandline in a subprocess."""
-    if not self._stderr:
-      raise RuntimeError('self._stderr not set up!')
-    if not self._stdout:
-      raise RuntimeError('self._stdout not set up!')
-
-    user, _ = self._getpwuid()
-    username, homedir = user.pw_name, user.pw_dir
-
-    # TODO(wickman) reconsider setsid now that we're invoking in a subshell
-    os.setsid()
-    if self._use_chroot:
-      self._chroot()
-    self._setuid()
-
-    # start process
-    start_time = self._platform.clock().time()
-
-    if not self._sandbox:
-      sandbox = os.getcwd()
-    else:
-      sandbox = self._sandbox if not self._use_chroot else '/'
-
-    thermos_profile = os.path.join(sandbox, self.RCFILE)
-    env = {
-      'HOME': homedir if self._use_chroot else sandbox,
-      'LOGNAME': username,
-      'USER': username,
-      'PATH': os.environ['PATH']
-    }
-
-    if os.path.exists(thermos_profile):
-      env.update(BASH_ENV=thermos_profile)
-
-    self._popen = subprocess.Popen(["/bin/bash", "-c", self.cmdline()],
-                                   stderr=self._stderr,
-                                   stdout=self._stdout,
-                                   close_fds=self.FD_CLOEXEC,
-                                   cwd=sandbox,
-                                   env=env)
-
-    self._write_process_update(state=ProcessState.RUNNING,
-                               pid=self._popen.pid,
-                               start_time=start_time)
-
-    # wait for job to finish
-    rc = self._popen.wait()
-
-    # indicate that we have finished/failed
-    if rc < 0:
-      state = ProcessState.KILLED
-    elif rc == 0:
-      state = ProcessState.SUCCESS
-    else:
-      state = ProcessState.FAILED
-
-    self._write_process_update(state=state,
-                               return_code=rc,
-                               stop_time=self._platform.clock().time())
-    self._rc = rc
-
-  def finish(self):
-    self._log('Coordinator exiting.')
-    sys.exit(0)


Mime
View raw message