aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [24/51] [partial] Rename twitter* and com.twitter to apache and org.apache directories to preserve all file history before the refactor.
Date Tue, 31 Dec 2013 21:20:17 GMT
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/executor/common/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/BUILD b/src/main/python/apache/aurora/executor/common/BUILD
new file mode 100644
index 0000000..9c932d1
--- /dev/null
+++ b/src/main/python/apache/aurora/executor/common/BUILD
@@ -0,0 +1,69 @@
+python_library(
+  name = 'status_checker',
+  sources = ['status_checker.py'],
+  dependencies = [
+    pants('aurora/twitterdeps/src/python/twitter/common/lang'),
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+    pants('aurora/twitterdeps/src/python/twitter/common/metrics'),
+  ]
+)
+
+python_library(
+  name = 'task_runner',
+  sources = ['task_runner.py'],
+  dependencies = [
+    pants(':status_checker'),
+    pants('aurora/twitterdeps/src/python/twitter/common/lang'),
+  ]
+)
+
+python_library(
+  name = 'health_checker',
+  sources = ['health_checker.py'],
+  dependencies = [
+    pants(':status_checker'),
+    pants(':task_info'),
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+    pants('aurora/twitterdeps/src/python/twitter/common/exceptions'),
+    pants('src/main/python/twitter/aurora/common:http_signaler'),
+  ]
+)
+
+python_library(
+  name = 'executor_timeout',
+  sources = ['executor_timeout.py'],
+  dependencies = [
+    pants('aurora/twitterdeps/src/python/twitter/common/exceptions'),
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+    pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
+  ]
+)
+
+python_library(
+  name = 'kill_manager',
+  sources = ['kill_manager.py'],
+  dependencies = [
+    pants(':status_checker'),
+  ]
+)
+
+python_library(
+  name = 'sandbox',
+  sources = ['sandbox.py'],
+  dependencies = [
+    pants('aurora/twitterdeps/src/python/twitter/common/dirutil'),
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+  ]
+)
+
+python_library(
+  name = 'task_info',
+  sources = ['task_info.py'],
+  dependencies = [
+    pants('src/main/python/twitter/aurora/BUILD.thirdparty:pystachio'),
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+    pants('src/main/python/twitter/aurora/config'),
+    pants('src/main/python/twitter/aurora/config/schema'),
+    pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
+  ]
+)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/executor/common/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/__init__.py b/src/main/python/apache/aurora/executor/common/__init__.py
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/executor/common/executor_timeout.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/executor_timeout.py b/src/main/python/apache/aurora/executor/common/executor_timeout.py
new file mode 100644
index 0000000..2828973
--- /dev/null
+++ b/src/main/python/apache/aurora/executor/common/executor_timeout.py
@@ -0,0 +1,21 @@
+from twitter.common import log
+from twitter.common.quantity import Amount, Time
+from twitter.common.exceptions import ExceptionalThread
+
+
+class ExecutorTimeout(ExceptionalThread):
+  DEFAULT_TIMEOUT = Amount(10, Time.SECONDS)
+
+  def __init__(self, event, driver, logger=log.error, timeout=DEFAULT_TIMEOUT):
+    self._event = event
+    self._logger = logger
+    self._driver = driver
+    self._timeout = timeout
+    super(ExecutorTimeout, self).__init__()
+    self.daemon = True
+
+  def run(self):
+    self._event.wait(self._timeout.as_(Time.SECONDS))
+    if not self._event.is_set():
+      self._logger('Executor timing out.')
+      self._driver.stop()

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/executor/common/health_checker.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/health_checker.py b/src/main/python/apache/aurora/executor/common/health_checker.py
new file mode 100644
index 0000000..3b25f0e
--- /dev/null
+++ b/src/main/python/apache/aurora/executor/common/health_checker.py
@@ -0,0 +1,99 @@
+import threading
+import time
+
+from twitter.aurora.common.http_signaler import HttpSignaler
+from twitter.common import log
+from twitter.common.exceptions import ExceptionalThread
+
+from .status_checker import (
+    ExitState,
+    StatusChecker,
+    StatusCheckerProvider,
+    StatusResult,
+)
+from .task_info import mesos_task_instance_from_assigned_task, resolve_ports
+
+
+class HealthCheckerThread(StatusChecker, ExceptionalThread):
+  """Generic, StatusChecker-conforming thread for arbitrary periodic health checks
+
+    health_checker should be a callable returning a tuple of (boolean, reason), indicating
+    respectively the health of the service and the reason for its failure (or None if the service is
+    still healthy).
+  """
+  def __init__(self,
+               health_checker,
+               interval_secs=30,
+               initial_interval_secs=None,
+               max_consecutive_failures=0,
+               clock=time):
+    self._checker = health_checker
+    self._interval = interval_secs
+    if initial_interval_secs is not None:
+      self._initial_interval = initial_interval_secs
+    else:
+      self._initial_interval = interval_secs * 2
+    self._current_consecutive_failures = 0
+    self._max_consecutive_failures = max_consecutive_failures
+    self._dead = threading.Event()
+    if self._initial_interval > 0:
+      self._healthy, self._reason = True, None
+    else:
+      self._healthy, self._reason = self._checker()
+    self._clock = clock
+    super(HealthCheckerThread, self).__init__()
+    self.daemon = True
+
+  @property
+  def status(self):
+    if not self._healthy:
+      return StatusResult('Failed health check! %s' % self._reason, ExitState.FAILED)
+
+  def run(self):
+    log.debug('Health checker thread started.')
+    self._clock.sleep(self._initial_interval)
+    log.debug('Initial interval expired.')
+    while not self._dead.is_set():
+      self._maybe_update_failure_count(*self._checker())
+      self._clock.sleep(self._interval)
+
+  def _maybe_update_failure_count(self, is_healthy, reason):
+    if not is_healthy:
+      log.warning('Health check failure: %s' % reason)
+      self._current_consecutive_failures += 1
+      if self._current_consecutive_failures > self._max_consecutive_failures:
+        log.warning('Reached consecutive failure limit.')
+        self._healthy = False
+        self._reason = reason
+    else:
+      if self._current_consecutive_failures > 0:
+        log.debug('Reset consecutive failures counter.')
+      self._current_consecutive_failures = 0
+
+  def start(self):
+    StatusChecker.start(self)
+    ExceptionalThread.start(self)
+
+  def stop(self):
+    log.debug('Health checker thread stopped.')
+    self._dead.set()
+
+
+class HealthCheckerProvider(StatusCheckerProvider):
+  def from_assigned_task(self, assigned_task, _):
+    mesos_task = mesos_task_instance_from_assigned_task(assigned_task)
+    portmap = resolve_ports(mesos_task, assigned_task.assignedPorts)
+
+    if 'health' not in portmap:
+      return None
+
+    health_check_config = mesos_task.health_check_config().get()
+    http_signaler = HttpSignaler(
+        portmap['health'],
+        timeout_secs=health_check_config.get('timeout_secs'))
+    health_checker = HealthCheckerThread(
+        http_signaler.health,
+        interval_secs=health_check_config.get('interval_secs'),
+        initial_interval_secs=health_check_config.get('initial_interval_secs'),
+        max_consecutive_failures=health_check_config.get('max_consecutive_failures'))
+    return health_checker

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/executor/common/kill_manager.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/kill_manager.py b/src/main/python/apache/aurora/executor/common/kill_manager.py
new file mode 100644
index 0000000..70ab733
--- /dev/null
+++ b/src/main/python/apache/aurora/executor/common/kill_manager.py
@@ -0,0 +1,19 @@
+from .status_checker import ExitState, StatusChecker, StatusResult
+
+
+class KillManager(StatusChecker):
+  """
+    A health interface that provides a kill-switch for a task monitored by the status manager.
+  """
+  def __init__(self):
+    self._killed = False
+    self._reason = None
+
+  @property
+  def status(self):
+    if self._killed:
+      return StatusResult(self._reason, ExitState.KILLED)
+
+  def kill(self, reason):
+    self._reason = reason
+    self._killed = True

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/executor/common/sandbox.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/sandbox.py b/src/main/python/apache/aurora/executor/common/sandbox.py
new file mode 100644
index 0000000..551d360
--- /dev/null
+++ b/src/main/python/apache/aurora/executor/common/sandbox.py
@@ -0,0 +1,72 @@
+from abc import abstractmethod, abstractproperty
+import getpass
+import grp
+import os
+import pwd
+
+from twitter.common import log
+from twitter.common.dirutil import safe_mkdir, safe_rmtree
+from twitter.common.lang import Interface
+
+
+class SandboxInterface(Interface):
+  class Error(Exception): pass
+  class CreationError(Error): pass
+  class DeletionError(Error): pass
+
+  @abstractproperty
+  def root(self):
+    """Return the root path of the sandbox."""
+
+  @abstractproperty
+  def chrooted(self):
+    """Returns whether or not the sandbox is a chroot."""
+
+  @abstractmethod
+  def exists(self):
+    """Returns true if the sandbox appears to exist."""
+
+  @abstractmethod
+  def create(self, *args, **kw):
+    """Create the sandbox."""
+
+  @abstractmethod
+  def destroy(self, *args, **kw):
+    """Destroy the sandbox."""
+
+
+class SandboxProvider(Interface):
+  @abstractmethod
+  def from_assigned_task(self, assigned_task):
+    """Return the appropriate Sandbox implementation from an AssignedTask."""
+
+
+class DirectorySandbox(SandboxInterface):
+  """ Basic sandbox implementation using a directory on the filesystem """
+  def __init__(self, root, user=getpass.getuser()):
+    self._root = root
+    self._user = user
+
+  @property
+  def root(self):
+    return self._root
+
+  @property
+  def chrooted(self):
+    return False
+
+  def exists(self):
+    return os.path.exists(self.root)
+
+  def create(self):
+    log.debug('DirectorySandbox: mkdir %s' % self.root)
+    safe_mkdir(self.root)
+    pwent = pwd.getpwnam(self._user)
+    grent = grp.getgrgid(pwent.pw_gid)
+    log.debug('DirectorySandbox: chown %s:%s %s' % (self._user, grent.gr_name, self.root))
+    os.chown(self.root, pwent.pw_uid, pwent.pw_gid)
+    log.debug('DirectorySandbox: chmod 700 %s' % self.root)
+    os.chmod(self.root, 0700)
+
+  def destroy(self):
+    safe_rmtree(self.root)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/executor/common/status_checker.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/status_checker.py b/src/main/python/apache/aurora/executor/common/status_checker.py
new file mode 100644
index 0000000..efffa8f
--- /dev/null
+++ b/src/main/python/apache/aurora/executor/common/status_checker.py
@@ -0,0 +1,111 @@
+from abc import abstractmethod, abstractproperty
+
+from twitter.common import log
+from twitter.common.lang import Interface
+from twitter.common.metrics import NamedGauge, Observable
+
+
+# This mirrors mesos_pb2 TaskStatus without explicitly depending upon it.
+#
+# The dependency is a 30MB egg, so for smaller applications that just need
+# the status text, we proxy them.  The actual conversion betwen ExitState
+# and TaskStatus is done in the StatusManager.
+class ExitState(object):
+  FAILED = object()
+  FINISHED = object()
+  KILLED = object()
+  LOST = object()
+
+  ALL_STATES = {
+    FAILED: 'FAILED',
+    FINISHED: 'FINISHED',
+    KILLED: 'KILLED',
+    LOST: 'LOST',
+  }
+
+
+class StatusResult(object):
+  """
+    Encapsulates a reason for failure and an optional reason which defaults to
+    ExitState.FAILED.
+  """
+
+  def __init__(self, reason, status):
+    self._reason = reason
+    if status not in ExitState.ALL_STATES:
+      raise ValueError('Unknown task state: %r' % status)
+    self._status = status
+
+  @property
+  def reason(self):
+    return self._reason
+
+  @property
+  def status(self):
+    return self._status
+
+  def __repr__(self):
+    return '%s(%r, status=%r)' % (
+        self.__class__.__name__,
+        self._reason,
+        ExitState.ALL_STATES[self._status])
+
+
+class StatusChecker(Observable, Interface):
+  """Interface to pluggable status checkers for the Aurora Executor."""
+
+  @abstractproperty
+  def status(self):
+    """Return None under normal operations.  Return StatusResult to indicate status proposal."""
+
+  def start(self):
+    """Invoked once the task has been started."""
+    self.metrics.register(NamedGauge('enabled', 1))
+
+  def stop(self):
+    """Invoked once a non-None status has been reported."""
+    pass
+
+
+class StatusCheckerProvider(Interface):
+  @abstractmethod
+  def from_assigned_task(self, assigned_task, sandbox):
+    pass
+
+
+class Healthy(StatusChecker):
+  @property
+  def status(self):
+    return None
+
+
+class ChainedStatusChecker(StatusChecker):
+  def __init__(self, status_checkers):
+    self._status_checkers = status_checkers
+    self._status = None
+    if not all(isinstance(h_i, StatusChecker) for h_i in status_checkers):
+      raise TypeError('ChainedStatusChecker must take an iterable of StatusCheckers.')
+    super(ChainedStatusChecker, self).__init__()
+
+  @property
+  def status(self):
+    if self._status is None:
+      for status_checker in self._status_checkers:
+        log.debug('Checking status from %s' % status_checker.__class__.__name__)
+        status_checker_status = status_checker.status
+        if status_checker_status is not None:
+          log.info('%s reported %s' % (status_checker.__class__.__name__, status_checker_status))
+          if not isinstance(status_checker_status, StatusResult):
+            raise TypeError('StatusChecker returned something other than a StatusResult: got %s' %
+                type(status_checker_status))
+          self._status = status_checker_status
+          break
+    return self._status
+
+  def start(self):
+    for status_checker in self._status_checkers:
+      status_checker.start()
+
+  def stop(self):
+    for status_checker in self._status_checkers:
+      status_checker.stop()

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/executor/common/task_info.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/task_info.py b/src/main/python/apache/aurora/executor/common/task_info.py
new file mode 100644
index 0000000..1a7cf27
--- /dev/null
+++ b/src/main/python/apache/aurora/executor/common/task_info.py
@@ -0,0 +1,90 @@
+import json
+
+from twitter.aurora.config.port_resolver import PortResolver
+from twitter.aurora.config.schema.base import MesosJob, MesosTaskInstance
+from twitter.aurora.config.thrift import task_instance_from_job
+from twitter.common import log
+
+from gen.twitter.aurora.ttypes import AssignedTask
+
+from pystachio import Ref
+from thrift.Thrift import TException
+from thrift.TSerialization import deserialize as thrift_deserialize
+
+
+def assigned_task_from_mesos_task(task):
+  """Deserialize AssignedTask from a launchTask task protocol buffer."""
+  try:
+    assigned_task = thrift_deserialize(AssignedTask(), task.data)
+  except (EOFError, TException) as e:
+    raise ValueError('Could not deserialize task! %s' % e)
+  return assigned_task
+
+
+def mesos_job_from_assigned_task(assigned_task):
+  """Deserialize a MesosJob pystachio struct from an AssignedTask."""
+  thermos_task = assigned_task.task.executorConfig.data
+  try:
+    json_blob = json.loads(thermos_task)
+  except (TypeError, ValueError):
+    return None
+  if 'instance' in json_blob:
+    # This is a MesosTaskInstance so we cannot get a MesosJob from this assigned_task
+    return None
+  return MesosJob.json_loads(thermos_task)
+
+
+def mesos_task_instance_from_assigned_task(assigned_task):
+  """Deserialize MesosTaskInstance from an AssignedTask thrift."""
+  thermos_task = assigned_task.task.executorConfig.data
+
+  if not thermos_task:
+    raise ValueError('Task did not have a thermos config!')
+
+  try:
+    json_blob = json.loads(thermos_task)
+  except (TypeError, ValueError) as e:
+    raise ValueError('Could not deserialize thermos config: %s' % e)
+
+  # As part of the transition for MESOS-2133, we can send either a MesosTaskInstance
+  # or we can be sending a MesosJob.  So handle both possible cases.  Once everyone
+  # is using MesosJob, then we can begin to leverage additional information that
+  # becomes available such as cluster.
+  if 'instance' in json_blob:
+    return MesosTaskInstance.json_loads(thermos_task)
+
+  # This is a MesosJob
+  mti, refs = task_instance_from_job(MesosJob.json_loads(thermos_task), assigned_task.instanceId)
+  for ref in refs:
+    # If the ref is {{thermos.task_id}} or a subscope of
+    # {{thermos.ports}}, it currently gets bound by the Thermos Runner,
+    # so we must leave them unbound.
+    #
+    # {{thermos.user}} is a legacy binding which we can safely ignore.
+    #
+    # TODO(wickman) These should be rewritten by the mesos client to use
+    # %%style%% replacements in order to allow us to better type-check configs
+    # client-side.
+    if ref == Ref.from_address('thermos.task_id'):
+      continue
+    if Ref.subscope(Ref.from_address('thermos.ports'), ref):
+      continue
+    if ref == Ref.from_address('thermos.user'):
+      continue
+    raise ValueError('Unexpected unbound refs: %s' % ' '.join(map(str, refs)))
+  return mti
+
+
+def resolve_ports(mesos_task, portmap):
+  """Given a MesosTaskInstance and the portmap of resolved ports from the scheduler,
+     create a fully resolved map of port name => port number for the thermos
+     runner and discovery manager."""
+  task_portmap = mesos_task.announce().portmap().get() if mesos_task.has_announce() else {}
+  task_portmap.update(portmap)
+  task_portmap = PortResolver.resolve(task_portmap)
+
+  for name, port in task_portmap.items():
+    if not isinstance(port, int):
+      log.warning('Task has unmapped port: %s => %s' % (name, port))
+
+  return dict((name, port) for (name, port) in task_portmap.items() if isinstance(port, int))

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/executor/common/task_runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/task_runner.py b/src/main/python/apache/aurora/executor/common/task_runner.py
new file mode 100644
index 0000000..c7a57cb
--- /dev/null
+++ b/src/main/python/apache/aurora/executor/common/task_runner.py
@@ -0,0 +1,20 @@
+from abc import abstractmethod
+
+from twitter.common.lang import Interface
+
+from .status_checker import StatusChecker
+
+
+class TaskError(Exception):
+  pass
+
+
+class TaskRunner(StatusChecker):
+  # For now, TaskRunner should just maintain the StatusChecker API.
+  pass
+
+
+class TaskRunnerProvider(Interface):
+  @abstractmethod
+  def from_assigned_task(self, assigned_task, sandbox):
+    pass

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/executor/executor_base.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/executor_base.py b/src/main/python/apache/aurora/executor/executor_base.py
new file mode 100644
index 0000000..4fd9aa4
--- /dev/null
+++ b/src/main/python/apache/aurora/executor/executor_base.py
@@ -0,0 +1,103 @@
+from twitter.common import log
+
+from gen.twitter.aurora.constants import TERMINAL_STATES as AURORA_TERMINAL_STATES
+from gen.twitter.aurora.ttypes import ScheduleStatus
+from gen.twitter.thermos.ttypes import TaskState
+
+import mesos
+import mesos_pb2 as mesos_pb
+
+
+class ThermosExecutorBase(mesos.Executor):
+  # Statuses are hard, let's go shopping.
+  MESOS_STATES = {
+      mesos_pb.TASK_STARTING: 'STARTING',
+      mesos_pb.TASK_RUNNING: 'RUNNING',
+      mesos_pb.TASK_FINISHED: 'FINISHED',
+      mesos_pb.TASK_FAILED: 'FAILED',
+      mesos_pb.TASK_KILLED: 'KILLED',
+      mesos_pb.TASK_LOST: 'LOST',
+  }
+
+  THERMOS_TO_MESOS_STATES = {
+      TaskState.ACTIVE: mesos_pb.TASK_RUNNING,
+      TaskState.SUCCESS: mesos_pb.TASK_FINISHED,
+      TaskState.FAILED: mesos_pb.TASK_FAILED,
+      TaskState.KILLED: mesos_pb.TASK_KILLED,
+      TaskState.LOST: mesos_pb.TASK_LOST,
+  }
+
+  THERMOS_TO_TWITTER_STATES = {
+      TaskState.ACTIVE: ScheduleStatus.RUNNING,
+      TaskState.CLEANING: ScheduleStatus.RUNNING,
+      TaskState.FINALIZING: ScheduleStatus.RUNNING,
+      TaskState.SUCCESS: ScheduleStatus.FINISHED,
+      TaskState.FAILED: ScheduleStatus.FAILED,
+      TaskState.KILLED: ScheduleStatus.KILLED,
+      TaskState.LOST: ScheduleStatus.LOST,
+  }
+
+  @staticmethod
+  def twitter_status_is_terminal(status):
+    return status in AURORA_TERMINAL_STATES
+
+  @staticmethod
+  def mesos_status_is_terminal(status):
+    return status in (
+        mesos_pb.TASK_FAILED,
+        mesos_pb.TASK_FINISHED,
+        mesos_pb.TASK_KILLED,
+        mesos_pb.TASK_LOST,
+    )
+
+  @staticmethod
+  def thermos_status_is_terminal(status):
+    return status in (
+        TaskState.FAILED,
+        TaskState.KILLED,
+        TaskState.LOST,
+        TaskState.SUCCESS,
+    )
+
+  def __init__(self):
+    self._slave_id = None
+
+  def log(self, msg):
+    log.info('Executor [%s]: %s' % (self._slave_id, msg))
+
+  def registered(self, driver, executor_info, framework_info, slave_info):
+    self.log('registered() called with:')
+    self.log('   ExecutorInfo:  %s' % executor_info)
+    self.log('   FrameworkInfo: %s' % framework_info)
+    self.log('   SlaveInfo:     %s' % slave_info)
+    self._driver = driver
+    self._executor_info = executor_info
+    self._framework_info = framework_info
+    self._slave_info = slave_info
+
+  def reregistered(self, driver, slave_info):
+    self.log('reregistered() called with:')
+    self.log('   SlaveInfo:     %s' % slave_info)
+
+  def disconnected(self, driver):
+    self.log('disconnected() called')
+
+  def send_update(self, driver, task_id, state, message=None):
+    update = mesos_pb.TaskStatus()
+    if not isinstance(state, int):
+      raise TypeError('Invalid state type %s, should be int.' % type(state))
+    if state not in self.MESOS_STATES:
+      raise ValueError('Invalid state: %s' % state)
+    update.state = state
+    update.task_id.value = task_id
+    if message:
+      update.message = str(message)
+    self.log('Updating %s => %s' % (task_id, self.MESOS_STATES[state]))
+    self.log('   Reason: %s' % message)
+    driver.sendStatusUpdate(update)
+
+  def frameworkMessage(self, driver, message):
+    self.log('frameworkMessage() got message: %s, ignoring.' % message)
+
+  def error(self, driver, message):
+    self.log('Received error message: %s' % message)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/executor/executor_detector.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/executor_detector.py b/src/main/python/apache/aurora/executor/executor_detector.py
new file mode 100644
index 0000000..c26c00e
--- /dev/null
+++ b/src/main/python/apache/aurora/executor/executor_detector.py
@@ -0,0 +1,77 @@
+import os
+from glob import glob
+
+from twitter.common.string import ScanfParser
+
+
+# TODO(wickman) MESOS-2805  This makes an assumption about the directory
+# layout Mesos provides.  Ideally we never need to do this and we should
+# work with the Mesos core team to make it unnecessary.
+class ExecutorDetector(object):
+  class Error(Exception): pass
+  class CannotFindRoot(Error): pass
+
+  LOG_PATH = 'executor_logs'
+  RESOURCE_PATH = 'resource_usage.recordio'
+  VARS_PATH = 'executor_vars.json'
+  PATTERN = [
+      '%(root)s',
+      'slaves',
+      '%(slave_id)s',
+      'frameworks',
+      '%(framework_id)s',
+      'executors',
+      '%(executor_id)s',
+      'runs',
+      '%(run)s']
+  EXTRACTOR = ScanfParser(os.path.join(*PATTERN))
+
+  @classmethod
+  def find_root(cls, path):
+    """Does this path appear to match the executor directory pattern?"""
+
+    def root_from_path(path):
+      path = os.path.normpath(path)
+      path_vector = path.split(os.path.sep)
+      pattern_vector = cls.PATTERN
+      if len(path_vector) < len(pattern_vector):
+        return None
+      for pattern, path_component in zip(reversed(pattern_vector), reversed(path_vector)):
+        if pattern.startswith('%'):
+          continue
+        if path_component != pattern:
+          return None
+      matched_path = os.path.join(*path_vector[-len(pattern_vector) + 1:])
+      return os.path.normpath(path[:-len(matched_path)])
+
+    while path != os.path.dirname(path):
+      root = root_from_path(path)
+      if root:
+        return root
+      path = os.path.dirname(path)
+
+  @classmethod
+  def match(cls, path):
+    try:
+      return cls.EXTRACTOR.parse(path)
+    except ScanfParser.ParseError:
+      return None
+
+  @classmethod
+  def path(cls, result):
+    return os.path.join(*cls.PATTERN) % result.groups()
+
+  @classmethod
+  def find(cls, root, slave_id='*', framework_id='*', executor_id='*', run='*'):
+    mixins = dict(
+        root=root, slave_id=slave_id, framework_id=framework_id, executor_id=executor_id, run=run)
+    return filter(None, map(cls.match, glob(os.path.join(*cls.PATTERN) % mixins)))
+
+  def __init__(self, root=None):
+    self.root = root or self.find_root(os.getcwd())
+    if self.root is None:
+      raise self.CannotFindRoot('Not a valid executor root!')
+
+  def __iter__(self):
+    for extraction in self.find(root=self.root):
+      yield extraction

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/executor/executor_vars.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/executor_vars.py b/src/main/python/apache/aurora/executor/executor_vars.py
new file mode 100644
index 0000000..7f54d8f
--- /dev/null
+++ b/src/main/python/apache/aurora/executor/executor_vars.py
@@ -0,0 +1,123 @@
+import os
+import time
+
+from twitter.common.exceptions import ExceptionalThread
+from twitter.common.metrics import (
+    LambdaGauge,
+    MutatorGauge,
+    NamedGauge,
+    Observable)
+from twitter.common.python.dirwrapper import PythonDirectoryWrapper
+from twitter.common.python.pex import PexInfo
+from twitter.common.quantity import Amount, Time
+from twitter.common.string.scanf import ScanfParser
+
+import psutil
+
+
+class ExecutorVars(Observable, ExceptionalThread):
+  """
+    Executor exported /vars wrapper.
+
+    Currently writes to disk and communicates through the Aurora Observer,
+    pending MESOS-433.
+  """
+  MUTATOR_METRICS = ('rss', 'cpu', 'thermos_pss', 'thermos_cpu')
+  RELEASE_TAG_FORMAT = ScanfParser('%(project)s_R%(release)d')
+  DEPLOY_TAG_FORMAT = ScanfParser('%(project)s_%(environment)s_%(release)d_R%(deploy)d')
+  PROJECT_NAMES = ('thermos', 'thermos_executor')
+  COLLECTION_INTERVAL = Amount(1, Time.MINUTES)
+
+  @classmethod
+  def get_release_from_tag(cls, tag):
+    def parse_from(parser):
+      try:
+        scanf = parser.parse(tag)
+        if scanf and scanf.project in cls.PROJECT_NAMES:
+          return scanf.release
+      except ScanfParser.ParseError:
+        pass
+    release = parse_from(cls.RELEASE_TAG_FORMAT)
+    if release is None:
+      release = parse_from(cls.DEPLOY_TAG_FORMAT)
+    if release is None:
+      release = 'UNKNOWN'
+    return release
+
+  @classmethod
+  def get_release_from_binary(cls, binary):
+    try:
+      pex_info = PexInfo.from_pex(PythonDirectoryWrapper.get(binary))
+      return cls.get_release_from_tag(pex_info.build_properties.get('tag', ''))
+    except PythonDirectoryWrapper.Error:
+      return 'UNKNOWN'
+
+  def __init__(self, clock=time):
+    self._clock = clock
+    self._self = psutil.Process(os.getpid())
+    if hasattr(self._self, 'getcwd'):
+      self._version = self.get_release_from_binary(
+        os.path.join(self._self.getcwd(), self._self.cmdline[1]))
+    else:
+      self._version = 'UNKNOWN'
+    self.metrics.register(NamedGauge('version', self._version))
+    self._orphan = False
+    self.metrics.register(LambdaGauge('orphan', lambda: int(self._orphan)))
+    self._metrics = dict((metric, MutatorGauge(metric, 0)) for metric in self.MUTATOR_METRICS)
+    for metric in self._metrics.values():
+      self.metrics.register(metric)
+    ExceptionalThread.__init__(self)
+    self.daemon = True
+
+  def write_metric(self, metric, value):
+    self._metrics[metric].write(value)
+
+  @classmethod
+  def thermos_children(cls, parent):
+    try:
+      for child in parent.get_children():
+        yield child  # thermos_runner
+        try:
+          for grandchild in child.get_children():
+            yield grandchild  # thermos_coordinator
+        except psutil.Error:
+          continue
+    except psutil.Error:
+      return
+
+  @classmethod
+  def aggregate_memory(cls, process, attribute='pss'):
+    try:
+      return sum(getattr(mmap, attribute) for mmap in process.get_memory_maps())
+    except (psutil.Error, AttributeError):
+      # psutil on OS X does not support get_memory_maps
+      return 0
+
+  @classmethod
+  def cpu_rss_pss(cls, process):
+    return (process.get_cpu_percent(0),
+            process.get_memory_info().rss,
+            cls.aggregate_memory(process, attribute='pss'))
+
+  def run(self):
+    while True:
+      self._clock.sleep(self.COLLECTION_INTERVAL.as_(Time.SECONDS))
+      self.sample()
+
+  def sample(self):
+    try:
+      executor_cpu, executor_rss, _ = self.cpu_rss_pss(self._self)
+      self.write_metric('cpu', executor_cpu)
+      self.write_metric('rss', executor_rss)
+      self._orphan = self._self.ppid == 1
+    except psutil.Error:
+      return False
+
+    try:
+      child_stats = map(self.cpu_rss_pss, self.thermos_children(self._self))
+      self.write_metric('thermos_cpu', sum(stat[0] for stat in child_stats))
+      self.write_metric('thermos_pss', sum(stat[2] for stat in child_stats))
+    except psutil.Error:
+      pass
+
+    return True

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/executor/gc_executor.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/gc_executor.py b/src/main/python/apache/aurora/executor/gc_executor.py
new file mode 100644
index 0000000..f17d0a2
--- /dev/null
+++ b/src/main/python/apache/aurora/executor/gc_executor.py
@@ -0,0 +1,499 @@
+"""Thermos garbage-collection (GC) executor
+
+This module containts the Thermos GC executor, responsible for garbage collecting old tasks and
+reconciling task states with the Mesos scheduler. It is intended to be run periodically on Mesos
+slaves utilising the Thermos executor.
+
+"""
+
+import os
+import threading
+import time
+
+from twitter.common.collections import OrderedDict
+from twitter.common.exceptions import ExceptionalThread
+from twitter.common.metrics import Observable
+from twitter.common.metrics.gauge import AtomicGauge
+from twitter.common.quantity import Amount, Time
+from twitter.thermos.common.ckpt import CheckpointDispatcher
+from twitter.thermos.common.path import TaskPath
+from twitter.thermos.core.inspector import CheckpointInspector
+from twitter.thermos.core.helper import TaskKiller
+from twitter.thermos.monitoring.detector import TaskDetector
+from twitter.thermos.monitoring.garbage import TaskGarbageCollector
+
+from gen.twitter.aurora.comm.ttypes import (
+    AdjustRetainedTasks,
+    DeletedTasks,
+    SchedulerMessage)
+from gen.twitter.aurora.ttypes import ScheduleStatus
+
+from .common.sandbox import DirectorySandbox, SandboxInterface
+from .executor_base import ThermosExecutorBase
+from .executor_detector import ExecutorDetector
+
+import mesos_pb2 as mesos_pb
+import psutil
+from thrift.TSerialization import deserialize as thrift_deserialize
+from thrift.TSerialization import serialize as thrift_serialize
+
+
+class ThermosGCExecutor(ThermosExecutorBase, ExceptionalThread, Observable):
+  """
+    Thermos GC Executor, responsible for:
+      - garbage collecting old tasks to make sure they don't clutter up the system
+      - state reconciliation with the scheduler (in case it thinks we're running
+        something we're not or vice versa.)
+  """
+  MAX_PID_TIME_DRIFT = Amount(10, Time.SECONDS)
+  MAX_CHECKPOINT_TIME_DRIFT = Amount(1, Time.HOURS)  # maximum runner disconnection time
+
+  # how old a task must be before we're willing to kill it, assuming that there could be
+  # slight races in the following scenario:
+  #    launch gc with retained_tasks={t1, t2, t3}
+  #    launch task t4
+  MINIMUM_KILL_AGE = Amount(10, Time.MINUTES)
+
+  # wait time between checking for new GC events from the slave and/or cleaning orphaned tasks
+  POLL_WAIT = Amount(5, Time.MINUTES)
+
+  # maximum amount of time the executor will wait with no tasks before it exits.
+  MAXIMUM_EXECUTOR_WAIT = Amount(15, Time.MINUTES)
+
+  # maximum lifetime of this executor.  this is to prevent older GC executor binaries from
+  # running forever
+  MAXIMUM_EXECUTOR_LIFETIME = Amount(1, Time.DAYS)
+
+  PERSISTENCE_WAIT = Amount(5, Time.SECONDS)
+
+  def __init__(self,
+               checkpoint_root,
+               verbose=True,
+               task_killer=TaskKiller,
+               executor_detector=ExecutorDetector,
+               task_garbage_collector=TaskGarbageCollector,
+               clock=time):
+    ThermosExecutorBase.__init__(self)
+    ExceptionalThread.__init__(self)
+    self.daemon = True
+    self._stop_event = threading.Event()
+    self._gc_task_queue = OrderedDict() # mapping of task_id => (TaskInfo, AdjustRetainedTasks), in
+                                        # the order in which they were received via a launchTask.
+    self._driver = None   # cache the ExecutorDriver provided by the slave, so we can use it
+                          # out of band from slave-initiated callbacks. This should be supplied by
+                          # ThermosExecutorBase.registered() when the executor first registers with
+                          # the slave.
+    self._slave_id = None # cache the slave ID provided by the slave
+    self._task_id = None  # the task_id currently being executed by the ThermosGCExecutor, if any
+    self._start_time = None # the start time of a task currently being executed, if any
+    self._detector = executor_detector()
+    self._collector = task_garbage_collector(root=checkpoint_root)
+    self._clock = clock
+    self._task_killer = task_killer
+    self._checkpoint_root = checkpoint_root
+    self._dropped_tasks = AtomicGauge('dropped_tasks')
+    self.metrics.register(self._dropped_tasks)
+
+  def _runner_ckpt(self, task_id):
+    """Return the runner checkpoint file for a given task_id."""
+    return TaskPath(root=self._checkpoint_root, task_id=task_id).getpath('runner_checkpoint')
+
+  def _terminate_task(self, task_id, kill=True):
+    """Terminate a task using the associated task killer. Returns a boolean indicating success."""
+    killer = self._task_killer(task_id, self._checkpoint_root)
+    self.log('Terminating %s...' % task_id)
+    runner_terminate = killer.kill if kill else killer.lose
+    try:
+      runner_terminate(force=True)
+      return True
+    except Exception as e:
+      self.log('Could not terminate: %s' % e)
+      return False
+
+  def partition_tasks(self):
+    """Return active/finished tasks as discovered from the checkpoint root."""
+    detector = TaskDetector(root=self._checkpoint_root)
+    active_tasks = set(t_id for _, t_id in detector.get_task_ids(state='active'))
+    finished_tasks = set(t_id for _, t_id in detector.get_task_ids(state='finished'))
+    return active_tasks, finished_tasks
+
+  def get_states(self, task_id):
+    """Returns the (timestamp, status) tuples of the task or [] if could not replay."""
+    statuses = CheckpointDispatcher.iter_statuses(self._runner_ckpt(task_id))
+    try:
+      return [(state.timestamp_ms / 1000.0, state.state) for state in statuses]
+    except CheckpointDispatcher.ErrorRecoveringState:
+      return []
+
+  def get_sandbox(self, task_id):
+    """Returns the sandbox of the task, or None if it has not yet been initialized."""
+    try:
+      for update in CheckpointDispatcher.iter_updates(self._runner_ckpt(task_id)):
+        if update.runner_header and update.runner_header.sandbox:
+          return update.runner_header.sandbox
+    except CheckpointDispatcher.ErrorRecoveringState:
+      return None
+
+  def maybe_terminate_unknown_task(self, task_id):
+    """Terminate a task if we believe the scheduler doesn't know about it.
+
+       It's possible for the scheduler to queue a GC and launch a task afterwards, in which
+       case we may see actively running tasks that the scheduler did not report in the
+       AdjustRetainedTasks message.
+
+       Returns:
+         boolean indicating whether the task was terminated
+    """
+    states = self.get_states(task_id)
+    if states:
+      task_start_time, _ = states[0]
+      if self._start_time - task_start_time > self.MINIMUM_KILL_AGE.as_(Time.SECONDS):
+        return self._terminate_task(task_id)
+    return False
+
+  def should_gc_task(self, task_id):
+    """Check if a possibly-corrupt task should be locally GCed
+
+      A task should be GCed if its checkpoint stream appears to be corrupted and the kill age
+      threshold is exceeded.
+
+       Returns:
+         set, containing the task_id if it should be marked for local GC, or empty otherwise
+    """
+    runner_ckpt = self._runner_ckpt(task_id)
+    if not os.path.exists(runner_ckpt):
+      return set()
+    latest_update = os.path.getmtime(runner_ckpt)
+    if self._start_time - latest_update > self.MINIMUM_KILL_AGE.as_(Time.SECONDS):
+      self.log('Got corrupt checkpoint file for %s - marking for local GC' % task_id)
+      return set([task_id])
+    else:
+      self.log('Checkpoint file unreadable, but not yet beyond MINIMUM_KILL_AGE threshold')
+      return set()
+
+  def reconcile_states(self, driver, retained_tasks):
+    """Reconcile states that the scheduler thinks tasks are in vs what they really are in.
+
+        Local    vs   Scheduler  => Action
+       ===================================
+        ACTIVE         ACTIVE    => no-op
+        ACTIVE        STARTING   => no-op
+        ACTIVE        TERMINAL   => maybe kill task*
+        ACTIVE        !EXISTS    => maybe kill task*
+       TERMINAL        ACTIVE    => send actual status**
+       TERMINAL       STARTING   => send actual status**
+       TERMINAL       TERMINAL   => no-op
+       TERMINAL       !EXISTS    => gc locally
+       !EXISTS         ACTIVE    => send LOST**
+       !EXISTS        STARTING   => no-op
+       !EXISTS        TERMINAL   => gc remotely
+
+       * - Only kill if this does not appear to be a race condition.
+       ** - These appear to have no effect
+
+       Side effecting operations:
+         ACTIVE   | (TERMINAL / !EXISTS) => maybe kill
+         TERMINAL | !EXISTS              => delete
+         !EXISTS  | TERMINAL             => delete
+
+      Returns tuple of (local_gc, remote_gc, updates), where:
+        local_gc - set of task_ids to be GCed locally
+        remote_gc - set of task_ids to be deleted on the scheduler
+        updates - dictionary of updates sent to the scheduler (task_id: ScheduleStatus)
+    """
+    def partition(rt):
+      active, starting, finished = set(), set(), set()
+      for task_id, schedule_status in rt.items():
+        if self.twitter_status_is_terminal(schedule_status):
+          finished.add(task_id)
+        elif (schedule_status == ScheduleStatus.STARTING or
+              schedule_status == ScheduleStatus.ASSIGNED):
+          starting.add(task_id)
+        else:
+          active.add(task_id)
+      return active, starting, finished
+
+    local_active, local_finished = self.partition_tasks()
+    sched_active, sched_starting, sched_finished = partition(retained_tasks)
+    local_task_ids = local_active | local_finished
+    sched_task_ids = sched_active | sched_starting | sched_finished
+    all_task_ids = local_task_ids | sched_task_ids
+
+    self.log('Told to retain the following task ids:')
+    for task_id, schedule_status in retained_tasks.items():
+      self.log('  => %s as %s' % (task_id, ScheduleStatus._VALUES_TO_NAMES[schedule_status]))
+
+    self.log('Local active tasks:')
+    for task_id in local_active:
+      self.log('  => %s' % task_id)
+
+    self.log('Local finished tasks:')
+    for task_id in local_finished:
+      self.log('  => %s' % task_id)
+
+    local_gc, remote_gc = set(), set()
+    updates = {}
+
+    for task_id in all_task_ids:
+      if task_id in local_active and task_id not in (sched_active | sched_starting):
+        self.log('Inspecting task %s for termination.' % task_id)
+        if not self.maybe_terminate_unknown_task(task_id):
+          local_gc.update(self.should_gc_task(task_id))
+      if task_id in local_finished and task_id not in sched_task_ids:
+        self.log('Queueing task %s for local deletion.' % task_id)
+        local_gc.add(task_id)
+      if task_id in local_finished and task_id in (sched_active | sched_starting):
+        self.log('Task %s finished but scheduler thinks active/starting.' % task_id)
+        states = self.get_states(task_id)
+        if states:
+          _, last_state = states[-1]
+          updates[task_id] = self.THERMOS_TO_TWITTER_STATES.get(last_state, ScheduleStatus.UNKNOWN)
+          self.send_update(
+              driver,
+              task_id,
+              self.THERMOS_TO_MESOS_STATES.get(last_state, mesos_pb.TASK_LOST),
+              'Task finish detected by GC executor.')
+        else:
+          local_gc.update(self.should_gc_task(task_id))
+      if task_id in sched_finished and task_id not in local_task_ids:
+        self.log('Queueing task %s for remote deletion.' % task_id)
+        remote_gc.add(task_id)
+      if task_id not in local_task_ids and task_id in sched_active:
+        self.log('Know nothing about task %s, telling scheduler of LOSS.' % task_id)
+        updates[task_id] = ScheduleStatus.LOST
+        self.send_update(driver, task_id, mesos_pb.TASK_LOST, 'GC executor found no trace of task.')
+      if task_id not in local_task_ids and task_id in sched_starting:
+        self.log('Know nothing about task %s, but scheduler says STARTING - passing' % task_id)
+
+    return local_gc, remote_gc, updates
+
+  def clean_orphans(self, driver):
+    """Inspect checkpoints for trees that have been kill -9'ed but not properly cleaned up."""
+    self.log('Checking for orphaned tasks')
+    active_tasks, _ = self.partition_tasks()
+    updates = {}
+
+    inspector = CheckpointInspector(self._checkpoint_root)
+
+    def is_our_process(process, uid, timestamp):
+      if process.uids.real != uid:
+        return False
+      estimated_start_time = self._clock.time() - process.create_time
+      return abs(timestamp - estimated_start_time) < self.MAX_PID_TIME_DRIFT.as_(Time.SECONDS)
+
+    for task_id in active_tasks:
+      self.log('Inspecting running task: %s' % task_id)
+      inspection = inspector.inspect(task_id)
+      if not inspection:
+        self.log('  - Error inspecting task runner')
+        continue
+      latest_runner = inspection.runner_processes[-1]
+      # Assume that it has not yet started?
+      if not latest_runner:
+        self.log('  - Task has no registered runners.')
+        continue
+      runner_pid, runner_uid, timestamp_ms = latest_runner
+      try:
+        runner_process = psutil.Process(runner_pid)
+        if is_our_process(runner_process, runner_uid, timestamp_ms / 1000.0):
+          self.log('  - Runner appears healthy.')
+          continue
+      except psutil.NoSuchProcess:
+        # Runner is dead
+        pass
+      except psutil.Error as err:
+        self.log('  - Error sampling runner process [pid=%s]: %s' % (runner_pid, err))
+        continue
+      try:
+        latest_update = os.path.getmtime(self._runner_ckpt(task_id))
+      except (IOError, OSError) as err:
+        self.log('  - Error accessing runner ckpt: %s' % err)
+        continue
+      if self._clock.time() - latest_update < self.MAX_CHECKPOINT_TIME_DRIFT.as_(Time.SECONDS):
+        self.log('  - Runner is dead but under LOST threshold.')
+        continue
+      self.log('  - Runner is dead but beyond LOST threshold: %.1fs' % (
+          self._clock.time() - latest_update))
+      if self._terminate_task(task_id, kill=False):
+        updates[task_id] = ScheduleStatus.LOST
+        self.send_update(
+            driver, task_id, mesos_pb.TASK_LOST, 'GC executor detected failed task runner.')
+
+    return updates
+
+  def _erase_sandbox(self, task_id):
+    # TODO(wickman) Only mesos should be in the business of garbage collecting sandboxes.
+    header_sandbox = self.get_sandbox(task_id)
+    directory_sandbox = DirectorySandbox(header_sandbox) if header_sandbox else None
+    if directory_sandbox and directory_sandbox.exists():
+      self.log('Destroying DirectorySandbox for %s' % task_id)
+      directory_sandbox.destroy()
+    else:
+      self.log('Found no sandboxes for %s' % task_id)
+
+  def _gc(self, task_id):
+    """Erase the sandbox, logs and metadata of the given task."""
+    self.log('Erasing sandbox for %s' % task_id)
+    self._erase_sandbox(task_id)
+    self.log('Erasing logs for %s' % task_id)
+    self._collector.erase_logs(task_id)
+    self.log('Erasing metadata for %s' % task_id)
+    self._collector.erase_metadata(task_id)
+
+  def garbage_collect(self, force_delete=frozenset()):
+    """Garbage collect tasks on the system no longer active or in the supplied force_delete.
+
+    Return a set of task_ids representing the tasks that were garbage collected.
+    """
+    active_tasks, finished_tasks = self.partition_tasks()
+    retained_executors = set(iter(self.linked_executors))
+    self.log('Executor sandboxes retained by Mesos:')
+    if retained_executors:
+      for r_e in sorted(retained_executors):
+        self.log('  %s' % r_e)
+    else:
+      self.log('  None')
+    for task_id in (active_tasks - retained_executors):
+      self.log('ERROR: Active task %s had its executor sandbox pulled.' % task_id)
+    gc_tasks = (finished_tasks - retained_executors) | force_delete
+    for gc_task in gc_tasks:
+      self._gc(gc_task)
+    return gc_tasks
+
+  @property
+  def linked_executors(self):
+    """Generator yielding the executor sandboxes detected on the system."""
+    thermos_executor_prefix = 'thermos-'
+    for executor in self._detector:
+      # It's possible for just the 'latest' symlink to be present but no run directories.
+      # This indicates that the task has been fully garbage collected.
+      if executor.executor_id.startswith(thermos_executor_prefix) and executor.run != 'latest':
+        yield executor.executor_id[len(thermos_executor_prefix):]
+
+  def _run_gc(self, task, retain_tasks, retain_start):
+    """
+      Reconcile the set of tasks to retain (provided by the scheduler) with the current state of
+      executors on this system. Garbage collect tasks/executors as appropriate.
+
+      Not re-entrant! Previous executions must complete (and clear self._task_id) before this can be
+      invoked.
+
+      Potentially blocking (e.g. on I/O) in self.garbage_collect()
+
+      Args:
+        task: TaskInfo provided by the slave
+        retain_tasks: mapping of task_id => ScheduleStatus, describing what the scheduler thinks is
+                      running on this system
+        retain_start: the time at which the retain_tasks message is effective -- this means that
+                      tasks started after the retain_tasks message is effective are skipped
+                      until future GC runs.
+    """
+    task_id = task.task_id.value
+    if self._task_id is not None:
+      raise RuntimeError('_run_gc() called [task_id=%s], but already running [task_id=%s]'
+                         % (task_id, self._task_id))
+    self._task_id = task_id
+    self.log('Launching garbage collection [task_id=%s]' % task_id)
+    self._start_time = retain_start
+    local_gc, remote_gc, _ = self.reconcile_states(self._driver, retain_tasks)
+    deleted_tasks = set(retain_tasks).intersection(self.garbage_collect(local_gc)) | remote_gc
+    if deleted_tasks:
+      self._driver.sendFrameworkMessage(thrift_serialize(
+          SchedulerMessage(deletedTasks=DeletedTasks(taskIds=deleted_tasks))))
+    self.send_update(
+        self._driver, task.task_id.value, mesos_pb.TASK_FINISHED, 'Garbage collection finished.')
+    self.log('Garbage collection complete [task_id=%s]' % task_id)
+    self._task_id = self._start_time = None
+
+  def run(self):
+    """Main GC executor event loop.
+
+      Periodically perform state reconciliation with the set of tasks provided
+      by the slave, and garbage collect orphaned tasks on the system.
+    """
+    run_start = self._clock.time()
+    last_gc_run = self._clock.time()
+
+    def should_terminate():
+      now = self._clock.time()
+      if now > run_start + self.MAXIMUM_EXECUTOR_LIFETIME.as_(Time.SECONDS):
+        return True
+      if now > last_gc_run + self.MAXIMUM_EXECUTOR_WAIT.as_(Time.SECONDS):
+        return True
+      return self._stop_event.is_set()
+
+    while not should_terminate():
+      try:
+        _, (task, retain_tasks, retain_start) = self._gc_task_queue.popitem(0)
+        last_gc_run = retain_start
+        self._run_gc(task, retain_tasks, retain_start)
+      except KeyError: # no enqueued GC tasks
+        pass
+      if self._driver is not None:
+        self.clean_orphans(self._driver)
+      self._stop_event.wait(self.POLL_WAIT.as_(Time.SECONDS))
+
+    # shutdown
+    if self._driver is not None:
+      try:
+        prev_task_id, _ = self._gc_task_queue.popitem(0)
+      except KeyError: # no enqueued GC tasks
+        pass
+      else:
+        self.send_update(self._driver, prev_task_id, mesos_pb.TASK_FINISHED,
+                         'Garbage collection skipped - GC executor shutting down')
+        # TODO(jon) Remove this once external MESOS-243 is resolved.
+        self.log('Sleeping briefly to mitigate https://issues.apache.org/jira/browse/MESOS-243')
+        self._clock.sleep(self.PERSISTENCE_WAIT.as_(Time.SECONDS))
+
+      self._driver.stop()
+
+  """ Mesos Executor API methods follow """
+
+  def launchTask(self, driver, task):
+    """Queue a new garbage collection run, and drop any currently-enqueued runs."""
+    if self._slave_id is None:
+      self._slave_id = task.slave_id.value
+    task_id = task.task_id.value
+    self.log('launchTask() got task_id: %s' % task_id)
+    if task_id == self._task_id:
+      self.log('=> GC with task_id %s currently running - ignoring' % task_id)
+      return
+    elif task_id in self._gc_task_queue:
+      self.log('=> Already have task_id %s queued - ignoring' % task_id)
+      return
+    try:
+      art = thrift_deserialize(AdjustRetainedTasks(), task.data)
+    except Exception as err:
+      self.log('Error deserializing task: %s' % err)
+      self.send_update(
+          self._driver, task_id, mesos_pb.TASK_FAILED, 'Deserialization of GC task failed')
+      return
+    try:
+      prev_task_id, _ = self._gc_task_queue.popitem(0)
+    except KeyError: # no enqueued GC tasks - reset counter
+      self._dropped_tasks.write(0)
+    else:
+      self.log('=> Dropping previously queued GC with task_id %s' % prev_task_id)
+      self._dropped_tasks.increment()
+      self.log('=> Updating scheduler')
+      self.send_update(self._driver, prev_task_id, mesos_pb.TASK_FINISHED,
+                       'Garbage collection skipped - GC executor received another task')
+    self.log('=> Adding %s to GC queue' % task_id)
+    self._gc_task_queue[task_id] = (task, art.retainedTasks, self._clock.time())
+
+  def killTask(self, driver, task_id):
+    """Remove the specified task from the queue, if it's not yet run. Otherwise, no-op."""
+    self.log('killTask() got task_id: %s' % task_id)
+    task = self._gc_task_queue.pop(task_id, None)
+    if task is not None:
+      self.log('=> Removed %s from queued GC tasks' % task_id)
+    elif task_id == self._task_id:
+      self.log('=> GC with task_id %s currently running - ignoring' % task_id)
+    else:
+      self.log('=> Unknown task_id %s - ignoring' % task_id)
+
+  def shutdown(self, driver):
+    """Trigger the Executor to shut down as soon as the current GC run is finished."""
+    self.log('shutdown() called - setting stop event')
+    self._stop_event.set()

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/executor/status_manager.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/status_manager.py b/src/main/python/apache/aurora/executor/status_manager.py
new file mode 100644
index 0000000..c974597
--- /dev/null
+++ b/src/main/python/apache/aurora/executor/status_manager.py
@@ -0,0 +1,39 @@
+import time
+
+from twitter.common import log
+from twitter.common.exceptions import ExceptionalThread
+from twitter.common.quantity import Amount, Time
+
+from .common.status_checker import StatusChecker
+
+
+class StatusManager(ExceptionalThread):
+  """
+    An agent that periodically checks the health of a task via StatusCheckers that
+    provide HTTP health checking, resource consumption, etc.
+
+    If any of the status interfaces return a status, the Status Manager
+    invokes the user-supplied callback with the status.
+  """
+  POLL_WAIT = Amount(500, Time.MILLISECONDS)
+
+  def __init__(self, status_checker, callback, clock=time):
+    if not isinstance(status_checker, StatusChecker):
+      raise TypeError('status_checker must be a StatusChecker, got %s' % type(status_checker))
+    if not callable(callback):
+      raise TypeError('callback needs to be callable!')
+    self._status_checker = status_checker
+    self._callback = callback
+    self._clock = clock
+    super(StatusManager, self).__init__()
+    self.daemon = True
+
+  def run(self):
+    while True:
+      status_result = self._status_checker.status
+      if status_result is not None:
+        log.info('Status manager got %s' % status_result)
+        self._callback(status_result)
+        break
+      else:
+        self._clock.sleep(self.POLL_WAIT.as_(Time.SECONDS))

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/executor/thermos_executor.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/thermos_executor.py b/src/main/python/apache/aurora/executor/thermos_executor.py
new file mode 100644
index 0000000..52b514a
--- /dev/null
+++ b/src/main/python/apache/aurora/executor/thermos_executor.py
@@ -0,0 +1,289 @@
+import os
+import threading
+import time
+import traceback
+
+from twitter.common import log
+from twitter.common.concurrent import deadline, defer, Timeout
+from twitter.common.metrics import Observable
+from twitter.common.quantity import Amount, Time
+
+from .common.kill_manager import KillManager
+from .common.sandbox import DirectorySandbox, SandboxProvider
+from .common.status_checker import ChainedStatusChecker, ExitState
+from .common.task_info import (
+    assigned_task_from_mesos_task,
+    mesos_task_instance_from_assigned_task,
+    resolve_ports,
+)
+from .common.task_runner import (
+    TaskError,
+    TaskRunner,
+    TaskRunnerProvider,
+)
+from .executor_base import ThermosExecutorBase
+from .status_manager import StatusManager
+
+import mesos_pb2 as mesos_pb
+
+
+class DefaultSandboxProvider(SandboxProvider):
+  SANDBOX_NAME = 'sandbox'
+
+  def from_assigned_task(self, assigned_task):
+    mesos_task = mesos_task_instance_from_assigned_task(assigned_task)
+    return DirectorySandbox(
+        os.path.realpath(self.SANDBOX_NAME),
+        mesos_task.role().get())
+
+
+class ThermosExecutor(Observable, ThermosExecutorBase):
+  PERSISTENCE_WAIT = Amount(5, Time.SECONDS)
+  SANDBOX_INITIALIZATION_TIMEOUT = Amount(10, Time.MINUTES)
+  START_TIMEOUT = Amount(2, Time.MINUTES)
+  STOP_TIMEOUT = Amount(2, Time.MINUTES)
+  STOP_WAIT = Amount(5, Time.SECONDS)
+
+  def __init__(self,
+               runner_provider,
+               status_manager_class=StatusManager,
+               sandbox_provider=DefaultSandboxProvider,
+               status_providers=(),
+               clock=time):
+
+    ThermosExecutorBase.__init__(self)
+    if not isinstance(runner_provider, TaskRunnerProvider):
+      raise TypeError('runner_provider must be a TaskRunnerProvider, got %s' %
+          type(runner_provider))
+    self._runner = None
+    self._runner_provider = runner_provider
+    self._clock = clock
+    self._task_id = None
+    self._status_providers = status_providers
+    self._status_manager = None
+    self._status_manager_class = status_manager_class
+    self._sandbox = None
+    self._sandbox_provider = sandbox_provider()
+    self._kill_manager = KillManager()
+    # Events that are exposed for interested entities
+    self.runner_aborted = threading.Event()
+    self.runner_started = threading.Event()
+    self.sandbox_initialized = threading.Event()
+    self.sandbox_created = threading.Event()
+    self.terminated = threading.Event()
+    self.launched = threading.Event()
+
+  @property
+  def runner(self):
+    return self._runner
+
+  def _die(self, driver, status, msg):
+    log.fatal(msg)
+    self.send_update(driver, self._task_id, status, msg)
+    defer(driver.stop, delay=self.STOP_WAIT)
+
+  def _run(self, driver, assigned_task, mesos_task):
+    """
+      Commence running a Task.
+        - Initialize the sandbox
+        - Start the ThermosTaskRunner (fork the Thermos TaskRunner)
+        - Set up necessary HealthCheckers
+        - Set up DiscoveryManager, if applicable
+        - Set up ResourceCheckpointer
+        - Set up StatusManager, and attach HealthCheckers
+    """
+    self.send_update(driver, self._task_id, mesos_pb.TASK_STARTING, 'Initializing sandbox.')
+
+    if not self._initialize_sandbox(driver, assigned_task):
+      return
+
+    # Fully resolve the portmap
+    portmap = resolve_ports(mesos_task, assigned_task.assignedPorts)
+
+    # start the process on a separate thread and give the message processing thread back
+    # to the driver
+    try:
+      self._runner = self._runner_provider.from_assigned_task(assigned_task, self._sandbox)
+    except TaskError as e:
+      self._die(driver, mesos_pb.TASK_FAILED, str(e))
+      return
+
+    if not isinstance(self._runner, TaskRunner):
+      self._die(driver, mesos_pb.TASK_FAILED, 'Unrecognized task!')
+      return
+
+    if not self._start_runner(driver, assigned_task, mesos_task, portmap):
+      return
+
+    self.send_update(driver, self._task_id, mesos_pb.TASK_RUNNING)
+
+    self._start_status_manager(driver, assigned_task, mesos_task, portmap)
+
+  def _initialize_sandbox(self, driver, assigned_task):
+    self._sandbox = self._sandbox_provider.from_assigned_task(assigned_task)
+    self.sandbox_initialized.set()
+    try:
+      deadline(self._sandbox.create, timeout=self.SANDBOX_INITIALIZATION_TIMEOUT,
+               daemon=True, propagate=True)
+    except Timeout:
+      self._die(driver, mesos_pb.TASK_FAILED, 'Timed out waiting for sandbox to initialize!')
+      return
+    except self._sandbox.Error as e:
+      self._die(driver, mesos_pb.TASK_FAILED, 'Failed to initialize sandbox: %s' % e)
+      return
+    self.sandbox_created.set()
+    return True
+
+  def _start_runner(self, driver, assigned_task, mesos_task, portmap):
+    if self.runner_aborted.is_set():
+      self._die(driver, mesos_pb.TASK_KILLED, 'Task killed during initialization.')
+
+    try:
+      deadline(self._runner.start, timeout=self.START_TIMEOUT, propagate=True)
+    except TaskError as e:
+      self._die(driver, mesos_pb.TASK_FAILED, 'Task initialization failed: %s' % e)
+      return False
+    except Timeout:
+      self._die(driver, mesos_pb.TASK_LOST, 'Timed out waiting for task to start!')
+      return False
+
+    self.runner_started.set()
+    log.debug('Task started.')
+
+    return True
+
+  def _start_status_manager(self, driver, assigned_task, mesos_task, portmap):
+    status_checkers = [self._kill_manager]
+    self.metrics.register_observable('kill_manager', self._kill_manager)
+
+    for status_provider in self._status_providers:
+      status_checker = status_provider.from_assigned_task(assigned_task, self._sandbox)
+      if status_checker is None:
+        continue
+      status_checkers.append(status_checker)
+      # self.metrics.register_observable()
+
+    self._chained_checker = ChainedStatusChecker(status_checkers)
+    self._chained_checker.start()
+
+    # chain the runner to the other checkers, but do not chain .start()/.stop()
+    complete_checker = ChainedStatusChecker([self._runner, self._chained_checker])
+    self._status_manager = self._status_manager_class(
+        complete_checker, self._shutdown, clock=self._clock)
+    self._status_manager.start()
+
+  def _signal_kill_manager(self, driver, task_id, reason):
+    if self._task_id is None:
+      log.error('Was asked to kill task but no task running!')
+      return
+    if task_id != self._task_id:
+      log.error('Asked to kill a task other than what we are running!')
+      return
+    if not self.sandbox_created.is_set():
+      log.error('Asked to kill task with incomplete sandbox - aborting runner start')
+      self.runner_aborted.set()
+      return
+    self.log('Activating kill manager.')
+    self._kill_manager.kill(reason)
+
+  @classmethod
+  def translate_exit_state_to_mesos(cls, exit_state):
+    # Put into executor_base?
+    if exit_state == ExitState.FAILED:
+      return mesos_pb.TASK_FAILED
+    elif exit_state == ExitState.KILLED:
+      return mesos_pb.TASK_KILLED
+    elif exit_state == ExitState.FINISHED:
+      return mesos_pb.TASK_FINISHED
+    elif exit_state == ExitState.LOST:
+      return mesos_pb.TASK_LOST
+    log.error('Unknown exit state, defaulting to TASK_FINISHED.')
+    return mesos_pb.TASK_FINISHED
+
+  def _shutdown(self, status_result):
+    runner_status = self._runner.status
+
+    try:
+      deadline(self._runner.stop, timeout=self.STOP_TIMEOUT)
+    except Timeout:
+      log.error('Failed to stop runner within deadline.')
+
+    try:
+      deadline(self._chained_checker.stop, timeout=self.STOP_TIMEOUT)
+    except Timeout:
+      log.error('Failed to stop all checkers within deadline.')
+
+    # If the runner was alive when _shutdown was called, defer to the status_result,
+    # otherwise the runner's terminal state is the preferred state.
+    exit_status = runner_status or status_result
+
+    self.send_update(
+        self._driver,
+        self._task_id,
+        self.translate_exit_state_to_mesos(exit_status.status),
+        status_result.reason)
+
+    self.terminated.set()
+    defer(self._driver.stop, delay=self.PERSISTENCE_WAIT)
+
+  """ Mesos Executor API methods follow """
+
+  def launchTask(self, driver, task):
+    """
+      Invoked when a task has been launched on this executor (initiated via Scheduler::launchTasks).
+      Note that this task can be realized with a thread, a process, or some simple computation,
+      however, no other callbacks will be invoked on this executor until this callback has returned.
+    """
+    self.launched.set()
+    self.log('launchTask got task: %s:%s' % (task.name, task.task_id.value))
+
+    # TODO(wickman)  Update the tests to call registered(), then remove this line and issue
+    # an assert if self._driver is not populated.
+    self._driver = driver
+
+    if self._runner:
+      log.error('Already running a task! %s' % self._task_id)
+      self.send_update(driver, task.task_id.value, mesos_pb.TASK_LOST,
+          "Task already running on this executor: %s" % self._task_id)
+      return
+
+    self._slave_id = task.slave_id.value
+    self._task_id = task.task_id.value
+
+    try:
+      assigned_task = assigned_task_from_mesos_task(task)
+      mesos_task = mesos_task_instance_from_assigned_task(assigned_task)
+    except Exception as e:
+      log.fatal('Could not deserialize AssignedTask')
+      log.fatal(traceback.format_exc())
+      self.send_update(
+          driver, self._task_id, mesos_pb.TASK_FAILED, "Could not deserialize task: %s" % e)
+      defer(driver.stop, delay=self.STOP_WAIT)
+      return
+
+    defer(lambda: self._run(driver, assigned_task, mesos_task))
+
+  def killTask(self, driver, task_id):
+    """
+     Invoked when a task running within this executor has been killed (via
+     SchedulerDriver::killTask). Note that no status update will be sent on behalf of the executor,
+     the executor is responsible for creating a new TaskStatus (i.e., with TASK_KILLED) and invoking
+     ExecutorDriver::sendStatusUpdate.
+    """
+    self.log('killTask got task_id: %s' % task_id)
+    self._signal_kill_manager(driver, task_id.value, "Instructed to kill task.")
+    self.log('killTask returned.')
+
+  def shutdown(self, driver):
+    """
+     Invoked when the executor should terminate all of its currently running tasks. Note that after
+     Mesos has determined that an executor has terminated any tasks that the executor did not send
+     terminal status updates for (e.g., TASK_KILLED, TASK_FINISHED, TASK_FAILED, etc) a TASK_LOST
+     status update will be created.
+
+    """
+    self.log('shutdown called')
+    if self._task_id:
+      self.log('shutting down %s' % self._task_id)
+      self._signal_kill_manager(driver, self._task_id, "Told to shut down executor.")
+    self.log('shutdown returned')

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/executor/thermos_runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/thermos_runner.py b/src/main/python/apache/aurora/executor/thermos_runner.py
new file mode 100644
index 0000000..0c9af05
--- /dev/null
+++ b/src/main/python/apache/aurora/executor/thermos_runner.py
@@ -0,0 +1,138 @@
+import functools
+import os
+import signal
+import sys
+import traceback
+
+from twitter.common import app, log
+from twitter.thermos.common.options import add_port_to
+from twitter.thermos.common.planner import TaskPlanner
+from twitter.thermos.config.loader import ThermosConfigLoader
+from twitter.thermos.core.runner import TaskRunner
+
+
+app.add_option(
+    "--thermos_json",
+    dest="thermos_json",
+    default=None,
+    help="read a thermos Task from a serialized json blob")
+
+
+app.add_option(
+    "--sandbox",
+    dest="sandbox",
+    metavar="PATH",
+    default=None,
+    help="the sandbox in which this task should run")
+
+
+app.add_option(
+     "--checkpoint_root",
+     dest="checkpoint_root",
+     metavar="PATH",
+     default=None,
+     help="the path where we will store checkpoints")
+
+
+app.add_option(
+     "--task_id",
+     dest="task_id",
+     metavar="STRING",
+     default=None,
+     help="The id to which this task should be bound, created if it does not exist.")
+
+
+app.add_option(
+     "--setuid",
+     dest="setuid",
+     metavar="USER",
+     default=None,
+     help="setuid tasks to this user, requires superuser privileges.")
+
+
+app.add_option(
+     "--enable_chroot",
+     dest="chroot",
+     default=False,
+     action='store_true',
+     help="chroot tasks to the sandbox before executing them.")
+
+
+app.add_option(
+     "--port",
+     type='string',
+     nargs=1,
+     action='callback',
+     callback=add_port_to('prebound_ports'),
+     dest='prebound_ports',
+     default={},
+     metavar = "NAME:PORT",
+     help="bind a numbered port PORT to name NAME")
+
+
+def get_task_from_options(opts):
+  tasks = ThermosConfigLoader.load_json(opts.thermos_json)
+  if len(tasks.tasks()) == 0:
+    app.error("No tasks specified!")
+  if len(tasks.tasks()) > 1:
+    app.error("Multiple tasks in config but no task name specified!")
+  task = tasks.tasks()[0]
+  if not task.task.check().ok():
+    app.error(task.task.check().message())
+  return task
+
+
+def runner_teardown(runner, sig=signal.SIGUSR1, frame=None):
+  """Destroy runner on SIGUSR1 (kill) or SIGUSR2 (lose)"""
+  op = 'kill' if sig == signal.SIGUSR1 else 'lose'
+  log.info('Thermos runner got signal %s, shutting down.' % sig)
+  log.info('Interrupted frame:')
+  if frame:
+    for line in ''.join(traceback.format_stack(frame)).splitlines():
+      log.info(line)
+  runner.close_ckpt()
+  log.info('Calling runner.%s()' % op)
+  getattr(runner, op)()
+  sys.exit(0)
+
+
+class CappedTaskPlanner(TaskPlanner):
+  TOTAL_RUN_LIMIT = 100
+
+
+def proxy_main(args, opts):
+  assert opts.thermos_json and os.path.exists(opts.thermos_json)
+  assert opts.sandbox
+  assert opts.checkpoint_root
+
+  thermos_task = get_task_from_options(opts)
+  prebound_ports = opts.prebound_ports
+  missing_ports = set(thermos_task.ports()) - set(prebound_ports)
+
+  if missing_ports:
+    app.error('ERROR!  Unbound ports: %s' % ' '.join(port for port in missing_ports))
+
+  task_runner = TaskRunner(
+      thermos_task.task,
+      opts.checkpoint_root,
+      opts.sandbox,
+      task_id=opts.task_id,
+      user=opts.setuid,
+      portmap=prebound_ports,
+      chroot=opts.chroot,
+      planner_class=CappedTaskPlanner
+  )
+
+  for sig in (signal.SIGUSR1, signal.SIGUSR2):
+    signal.signal(sig, functools.partial(runner_teardown, task_runner))
+
+  try:
+    task_runner.run()
+  except TaskRunner.InternalError as err:
+    app.error('Internal error: %s' % err)
+  except TaskRunner.InvalidTask as err:
+    app.error(str(err))
+  except TaskRunner.StateError:
+    app.error('Task appears to already be in a terminal state.')
+  except KeyboardInterrupt:
+    runner_teardown(task_runner)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/executor/thermos_task_runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/thermos_task_runner.py b/src/main/python/apache/aurora/executor/thermos_task_runner.py
new file mode 100644
index 0000000..dba99d2
--- /dev/null
+++ b/src/main/python/apache/aurora/executor/thermos_task_runner.py
@@ -0,0 +1,331 @@
+import errno
+import getpass
+import os
+import signal
+import subprocess
+import threading
+import time
+
+from twitter.aurora.common.http_signaler import HttpSignaler
+from twitter.common import log
+from twitter.common.dirutil import chmod_plus_x, safe_mkdtemp
+from twitter.common.log.options import LogOptions
+from twitter.common.quantity import Amount, Time
+from twitter.thermos.common.path import TaskPath
+from twitter.thermos.config.loader import ThermosTaskWrapper
+from twitter.thermos.core import runner as core
+from twitter.thermos.monitoring.monitor import TaskMonitor
+
+from gen.twitter.thermos.ttypes import TaskState
+
+from .common.status_checker import ExitState, StatusResult
+from .common.task_info import (
+    mesos_task_instance_from_assigned_task,
+    resolve_ports,
+)
+from .common.task_runner import (
+    TaskError,
+    TaskRunner,
+    TaskRunnerProvider,
+)
+
+
+class ThermosTaskRunner(TaskRunner):
+  ESCALATION_WAIT = Amount(5, Time.SECONDS)
+  EXIT_STATE_MAP = {
+      TaskState.ACTIVE: StatusResult('Runner died while task was active.', ExitState.LOST),
+      TaskState.FAILED: StatusResult('Task failed.', ExitState.FAILED),
+      TaskState.KILLED: StatusResult('Task killed.', ExitState.KILLED),
+      TaskState.LOST: StatusResult('Task lost.', ExitState.LOST),
+      TaskState.SUCCESS: StatusResult('Task finished.', ExitState.FINISHED),
+  }
+  MAX_WAIT = Amount(1, Time.MINUTES)
+  PEX_NAME = 'thermos_runner.pex'
+  POLL_INTERVAL = Amount(500, Time.MILLISECONDS)
+  THERMOS_PREEMPTION_WAIT = Amount(1, Time.MINUTES)
+
+  def __init__(self,
+               runner_pex,
+               task_id,
+               task,
+               role,
+               portmap,
+               sandbox,
+               checkpoint_root=None,
+               artifact_dir=None,
+               clock=time):
+    """
+      runner_pex       location of the thermos_runner pex that this task runner should use
+      task_id          task_id assigned by scheduler
+      task             thermos pystachio Task object
+      role             role to run the task under
+      portmap          { name => port } dictionary
+      sandbox          the sandbox object
+      checkpoint_root  the checkpoint root for the thermos runner
+      artifact_dir     scratch space for the thermos runner (basically cwd of thermos.pex)
+      clock            clock
+    """
+    self._runner_pex = runner_pex
+    self._task_id = task_id
+    self._task = task
+    self._popen = None
+    self._monitor = None
+    self._status = None
+    self._ports = portmap
+    self._root = sandbox.root
+    self._checkpoint_root = checkpoint_root or TaskPath.DEFAULT_CHECKPOINT_ROOT
+    self._enable_chroot = sandbox.chrooted
+    self._role = role
+    self._clock = clock
+    self._artifact_dir = artifact_dir or safe_mkdtemp()
+
+    # wait events
+    self._dead = threading.Event()
+    self._kill_signal = threading.Event()
+    self.forking = threading.Event()
+    self.forked = threading.Event()
+
+    try:
+      with open(os.path.join(self._artifact_dir, 'task.json'), 'w') as fp:
+        self._task_filename = fp.name
+        ThermosTaskWrapper(self._task).to_file(self._task_filename)
+    except ThermosTaskWrapper.InvalidTask as e:
+      raise TaskError('Failed to load task: %s' % e)
+
+  def _terminate_http(self):
+    if 'health' not in self._ports:
+      return
+
+    http_signaler = HttpSignaler(self._ports['health'])
+
+    # pass 1
+    http_signaler.quitquitquit()
+    self._clock.sleep(self.ESCALATION_WAIT.as_(Time.SECONDS))
+    if self.status is not None:
+      return True
+
+    # pass 2
+    http_signaler.abortabortabort()
+    self._clock.sleep(self.ESCALATION_WAIT.as_(Time.SECONDS))
+    if self.status is not None:
+      return True
+
+  @property
+  def artifact_dir(self):
+    return self._artifact_dir
+
+  def task_state(self):
+    return self._monitor.task_state() if self._monitor else None
+
+  @property
+  def is_alive(self):
+    """
+      Is the process underlying the Thermos task runner alive?
+    """
+    if not self._popen:
+      return False
+    if self._dead.is_set():
+      return False
+
+    # N.B. You cannot mix this code and any code that relies upon os.wait
+    # mechanisms with blanket child process collection.  One example is the
+    # Thermos task runner which calls os.wait4 -- without refactoring, you
+    # should not mix a Thermos task runner in the same process as this
+    # thread.
+    try:
+      pid, _ = os.waitpid(self._popen.pid, os.WNOHANG)
+      if pid == 0:
+        return True
+      else:
+        log.info('Detected runner termination: pid=%s' % pid)
+    except OSError as e:
+      log.error('is_alive got OSError: %s' % e)
+      if e.errno != errno.ECHILD:
+        raise
+
+    self._dead.set()
+    return False
+
+  def compute_status(self):
+    if self.is_alive:
+      return None
+    exit_state = self.EXIT_STATE_MAP.get(self.task_state())
+    if exit_state is None:
+      log.error('Received unexpected exit state from TaskMonitor.')
+    return exit_state
+
+  def terminate_runner(self, as_loss=False):
+    """
+      Terminate the underlying runner process, if it exists.
+    """
+    if self._kill_signal.is_set():
+      log.warning('Duplicate kill/lose signal received, ignoring.')
+      return
+    self._kill_signal.set()
+    if self.is_alive:
+      sig = 'SIGUSR2' if as_loss else 'SIGUSR1'
+      log.info('Runner is alive, sending %s' % sig)
+      try:
+        self._popen.send_signal(getattr(signal, sig))
+      except OSError as e:
+        log.error('Got OSError sending %s: %s' % (sig, e))
+    else:
+      log.info('Runner is dead, skipping kill.')
+
+  def kill(self):
+    self.terminate_runner()
+
+  def lose(self):
+    self.terminate_runner(as_loss=True)
+
+  def quitquitquit(self):
+    """Bind to the process tree of a Thermos task and kill it with impunity."""
+    try:
+      runner = core.TaskRunner.get(self._task_id, self._checkpoint_root)
+      if runner:
+        log.info('quitquitquit calling runner.kill')
+        # Right now preemption wait is hardcoded, though it may become configurable in the future.
+        runner.kill(force=True, preemption_wait=self.THERMOS_PREEMPTION_WAIT)
+      else:
+        log.error('Could not instantiate runner!')
+    except core.TaskRunner.Error as e:
+      log.error('Could not quitquitquit runner: %s' % e)
+
+  def _cmdline(self):
+    params = dict(log_dir=LogOptions.log_dir(),
+                  log_to_disk='DEBUG',
+                  checkpoint_root=self._checkpoint_root,
+                  sandbox=self._root,
+                  task_id=self._task_id,
+                  thermos_json=self._task_filename)
+
+    if getpass.getuser() == 'root':
+      params.update(setuid=self._role)
+
+    cmdline_args = [self._runner_pex]
+    cmdline_args.extend('--%s=%s' % (flag, value) for flag, value in params.items())
+    if self._enable_chroot:
+      cmdline_args.extend(['--enable_chroot'])
+    for name, port in self._ports.items():
+      cmdline_args.extend(['--port=%s:%s' % (name, port)])
+    return cmdline_args
+
+  # --- public interface
+  def start(self, timeout=MAX_WAIT):
+    """Fork the task runner and return once the underlying task is running, up to timeout."""
+    self.forking.set()
+
+    try:
+      chmod_plus_x(self._runner_pex)
+    except OSError as e:
+      if e.errno != errno.EPERM:
+        raise TaskError('Failed to chmod +x runner: %s' % e)
+
+    self._monitor = TaskMonitor(TaskPath(root=self._checkpoint_root), self._task_id)
+
+    cmdline_args = self._cmdline()
+    log.info('Forking off runner with cmdline: %s' % ' '.join(cmdline_args))
+
+    try:
+      self._popen = subprocess.Popen(cmdline_args)
+    except OSError as e:
+      raise TaskError(e)
+
+    self.forked.set()
+
+    log.debug('Waiting for task to start.')
+
+    def is_started():
+      return self._monitor and (self._monitor.active or self._monitor.finished)
+
+    waited = Amount(0, Time.SECONDS)
+    while not is_started() and waited < timeout:
+      log.debug('  - sleeping...')
+      self._clock.sleep(self.POLL_INTERVAL.as_(Time.SECONDS))
+      waited += self.POLL_INTERVAL
+
+    if not is_started():
+      log.error('Task did not start with in deadline, forcing loss.')
+      self.lose()
+      raise TaskError('Task did not start within deadline.')
+
+  def stop(self, timeout=MAX_WAIT):
+    """Stop the runner.  If it's already completed, no-op.  If it's still running, issue a kill."""
+    log.info('ThermosTaskRunner is shutting down.')
+
+    if not self.forking.is_set():
+      raise TaskError('Failed to call TaskRunner.start.')
+
+    log.info('Invoking runner HTTP teardown.')
+    self._terminate_http()
+
+    log.info('Invoking runner.kill')
+    self.kill()
+
+    waited = Amount(0, Time.SECONDS)
+    while self.is_alive and waited < timeout:
+      self._clock.sleep(self.POLL_INTERVAL.as_(Time.SECONDS))
+      waited += self.POLL_INTERVAL
+
+    if not self.is_alive and self.task_state() != TaskState.ACTIVE:
+      return
+
+    log.info('Thermos task did not shut down cleanly, rebinding to kill.')
+    self.quitquitquit()
+
+    while not self._monitor.finished and waited < timeout:
+      self._clock.sleep(self.POLL_INTERVAL.as_(Time.SECONDS))
+      waited += self.POLL_INTERVAL
+
+    if not self._monitor.finished:
+      raise TaskError('Task did not stop within deadline.')
+
+  @property
+  def status(self):
+    """Return the StatusResult of this task runner.  This returns None as
+       long as no terminal state is reached."""
+    if self._status is None:
+      self._status = self.compute_status()
+    return self._status
+
+
+class DefaultThermosTaskRunnerProvider(TaskRunnerProvider):
+  def __init__(self,
+               pex_location,
+               checkpoint_root=None,
+               artifact_dir=None,
+               task_runner_class=ThermosTaskRunner,
+               max_wait=Amount(1, Time.MINUTES),
+               preemption_wait=Amount(1, Time.MINUTES),
+               poll_interval=Amount(500, Time.MILLISECONDS),
+               clock=time):
+    self._artifact_dir = artifact_dir or safe_mkdtemp()
+    self._checkpoint_root = checkpoint_root
+    self._clock = clock
+    self._max_wait = max_wait
+    self._pex_location = pex_location
+    self._poll_interval = poll_interval
+    self._preemption_wait = preemption_wait
+    self._task_runner_class = task_runner_class
+
+  def from_assigned_task(self, assigned_task, sandbox):
+    task_id = assigned_task.taskId
+    role = assigned_task.task.owner.role
+    mesos_task = mesos_task_instance_from_assigned_task(assigned_task)
+    mesos_ports = resolve_ports(mesos_task, assigned_task.assignedPorts)
+
+    class ProvidedThermosTaskRunner(self._task_runner_class):
+      MAX_WAIT = self._max_wait
+      POLL_INTERVAL = self._poll_interval
+      THERMOS_PREEMPTION_WAIT = self._preemption_wait
+
+    return ProvidedThermosTaskRunner(
+        self._pex_location,
+        task_id,
+        mesos_task.task(),
+        role,
+        mesos_ports,
+        sandbox,
+        checkpoint_root=self._checkpoint_root,
+        artifact_dir=self._artifact_dir,
+        clock=self._clock)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/tools/java/organize_imports.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/tools/java/organize_imports.py b/src/main/python/apache/aurora/tools/java/organize_imports.py
new file mode 100644
index 0000000..48a0821
--- /dev/null
+++ b/src/main/python/apache/aurora/tools/java/organize_imports.py
@@ -0,0 +1,130 @@
+#!/bin/env python
+'''
+Organizes a java source file's import statements in a way that pleases Twitter's checkstyle
+configuration.
+This expects exactly one argument: the name of the file to modify with preferred import
+ordering.
+'''
+
+from __future__ import print_function
+
+import re
+import sys
+from collections import defaultdict
+
+
+IMPORT_RE = re.compile('import(?: static)? (.*);')
+def get_group(import_statement):
+  matcher = IMPORT_RE.match(import_statement)
+  assert matcher, 'Could not parse import statement: %s' % import_statement
+  class_name = matcher.group(1)
+  if class_name.startswith('com.twitter'):
+    group = 'com.twitter'
+  else:
+    group = class_name.split('.')[0]
+  return group
+
+
+def index_by_group(import_statements):
+  groups = defaultdict(list)
+  for line in import_statements:
+    groups[get_group(line)].append(line)
+  return groups
+
+
+IMPORT_CLASS_RE = re.compile(
+    'import(?: static)? (?P<outer>[^A-Z]*[A-Z]\w+)(?:\.(?P<inners>[\w][^;]*))?')
+def get_all_group_lines(import_groups):
+  if not import_groups:
+    return []
+
+  def get_group_lines(group):
+    def comparator(x, y):
+      # These shenanigans are used to properly order imports for inner classes.
+      # So we get ordering like:
+      # import com.foo.Bar;
+      # import com.foo.Bar.Baz;
+      # (this is not lexicographical, so normal sort won't suffice)
+      x_m = IMPORT_CLASS_RE.match(x)
+      y_m = IMPORT_CLASS_RE.match(y)
+      if x_m.group('outer') == y_m.group('outer'):
+        return cmp(x_m.group('inners'), y_m.group('inners'))
+      else:
+        return cmp(x, y)
+    lines = sorted(import_groups[group], comparator)
+    lines.append('')
+    return lines 
+
+  all_lines = []
+  explicit_groups = ['java', 'javax', 'scala', 'com', 'net', 'org', 'com.twitter']
+  for group in explicit_groups:
+    if group in import_groups:
+      all_lines += get_group_lines(group)
+
+  # Gather remaining groups.
+  remaining_groups = sorted(set(import_groups.keys()) - set(explicit_groups))
+  for group in remaining_groups:
+    all_lines += get_group_lines(group)
+  return all_lines
+
+
+if len(sys.argv) != 2:
+  print('usage: %s FILE' % sys.argv[0])
+  sys.exit(1)
+
+BEFORE_IMPORTS = 'before_imports'
+IMPORTS = 'imports'
+STATIC_IMPORTS = 'static_imports'
+AFTER_IMPORTS = 'after_imports'
+
+print('Organizing imports in %s' % sys.argv[1])
+lines_before_imports = []
+import_lines = []
+static_import_lines = []
+lines_after_imports = []
+with open(sys.argv[1], 'r') as f:
+  position = BEFORE_IMPORTS
+  for line in f:
+    line = line.rstrip()
+    if position == BEFORE_IMPORTS:
+      if line.startswith('import'):
+        position = IMPORTS
+      else:
+        lines_before_imports.append(line)
+    if position == IMPORTS:
+      if line.startswith('import static'):
+        position = STATIC_IMPORTS
+      elif line.startswith('import'):
+        import_lines.append(line)
+      elif line.strip():
+        position = AFTER_IMPORTS
+    if position == STATIC_IMPORTS:
+      if line.startswith('import static'):
+        static_import_lines.append(line)
+      elif line.strip():
+        position = AFTER_IMPORTS
+    if position == AFTER_IMPORTS:
+      lines_after_imports.append(line)
+
+import_groups = index_by_group(import_lines)
+static_import_groups = index_by_group(static_import_lines)
+
+def ensure_line_padding(lines):
+  if lines and lines[-1] != '':
+    lines.append('')
+  return lines
+
+file_lines = lines_before_imports
+if import_groups:
+  ensure_line_padding(file_lines)
+  file_lines += get_all_group_lines(import_groups)
+if static_import_groups:
+  ensure_line_padding(file_lines)
+  file_lines += get_all_group_lines(static_import_groups)
+if lines_after_imports:
+  ensure_line_padding(file_lines)
+  file_lines += lines_after_imports
+
+with open(sys.argv[1], 'w') as f:
+  for line in file_lines:
+    print(line, file=f)


Mime
View raw message