aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [12/51] [partial] Rename twitter* and com.twitter to apache and org.apache directories to preserve all file history before the refactor.
Date Tue, 31 Dec 2013 21:20:05 GMT
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/executor/common/status_checker.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/executor/common/status_checker.py b/src/main/python/twitter/aurora/executor/common/status_checker.py
deleted file mode 100644
index efffa8f..0000000
--- a/src/main/python/twitter/aurora/executor/common/status_checker.py
+++ /dev/null
@@ -1,111 +0,0 @@
-from abc import abstractmethod, abstractproperty
-
-from twitter.common import log
-from twitter.common.lang import Interface
-from twitter.common.metrics import NamedGauge, Observable
-
-
-# This mirrors mesos_pb2 TaskStatus without explicitly depending upon it.
-#
-# The dependency is a 30MB egg, so for smaller applications that just need
-# the status text, we proxy them.  The actual conversion betwen ExitState
-# and TaskStatus is done in the StatusManager.
-class ExitState(object):
-  FAILED = object()
-  FINISHED = object()
-  KILLED = object()
-  LOST = object()
-
-  ALL_STATES = {
-    FAILED: 'FAILED',
-    FINISHED: 'FINISHED',
-    KILLED: 'KILLED',
-    LOST: 'LOST',
-  }
-
-
-class StatusResult(object):
-  """
-    Encapsulates a reason for failure and an optional reason which defaults to
-    ExitState.FAILED.
-  """
-
-  def __init__(self, reason, status):
-    self._reason = reason
-    if status not in ExitState.ALL_STATES:
-      raise ValueError('Unknown task state: %r' % status)
-    self._status = status
-
-  @property
-  def reason(self):
-    return self._reason
-
-  @property
-  def status(self):
-    return self._status
-
-  def __repr__(self):
-    return '%s(%r, status=%r)' % (
-        self.__class__.__name__,
-        self._reason,
-        ExitState.ALL_STATES[self._status])
-
-
-class StatusChecker(Observable, Interface):
-  """Interface to pluggable status checkers for the Aurora Executor."""
-
-  @abstractproperty
-  def status(self):
-    """Return None under normal operations.  Return StatusResult to indicate status proposal."""
-
-  def start(self):
-    """Invoked once the task has been started."""
-    self.metrics.register(NamedGauge('enabled', 1))
-
-  def stop(self):
-    """Invoked once a non-None status has been reported."""
-    pass
-
-
-class StatusCheckerProvider(Interface):
-  @abstractmethod
-  def from_assigned_task(self, assigned_task, sandbox):
-    pass
-
-
-class Healthy(StatusChecker):
-  @property
-  def status(self):
-    return None
-
-
-class ChainedStatusChecker(StatusChecker):
-  def __init__(self, status_checkers):
-    self._status_checkers = status_checkers
-    self._status = None
-    if not all(isinstance(h_i, StatusChecker) for h_i in status_checkers):
-      raise TypeError('ChainedStatusChecker must take an iterable of StatusCheckers.')
-    super(ChainedStatusChecker, self).__init__()
-
-  @property
-  def status(self):
-    if self._status is None:
-      for status_checker in self._status_checkers:
-        log.debug('Checking status from %s' % status_checker.__class__.__name__)
-        status_checker_status = status_checker.status
-        if status_checker_status is not None:
-          log.info('%s reported %s' % (status_checker.__class__.__name__, status_checker_status))
-          if not isinstance(status_checker_status, StatusResult):
-            raise TypeError('StatusChecker returned something other than a StatusResult: got %s' %
-                type(status_checker_status))
-          self._status = status_checker_status
-          break
-    return self._status
-
-  def start(self):
-    for status_checker in self._status_checkers:
-      status_checker.start()
-
-  def stop(self):
-    for status_checker in self._status_checkers:
-      status_checker.stop()

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/executor/common/task_info.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/executor/common/task_info.py b/src/main/python/twitter/aurora/executor/common/task_info.py
deleted file mode 100644
index 1a7cf27..0000000
--- a/src/main/python/twitter/aurora/executor/common/task_info.py
+++ /dev/null
@@ -1,90 +0,0 @@
-import json
-
-from twitter.aurora.config.port_resolver import PortResolver
-from twitter.aurora.config.schema.base import MesosJob, MesosTaskInstance
-from twitter.aurora.config.thrift import task_instance_from_job
-from twitter.common import log
-
-from gen.twitter.aurora.ttypes import AssignedTask
-
-from pystachio import Ref
-from thrift.Thrift import TException
-from thrift.TSerialization import deserialize as thrift_deserialize
-
-
-def assigned_task_from_mesos_task(task):
-  """Deserialize AssignedTask from a launchTask task protocol buffer."""
-  try:
-    assigned_task = thrift_deserialize(AssignedTask(), task.data)
-  except (EOFError, TException) as e:
-    raise ValueError('Could not deserialize task! %s' % e)
-  return assigned_task
-
-
-def mesos_job_from_assigned_task(assigned_task):
-  """Deserialize a MesosJob pystachio struct from an AssignedTask."""
-  thermos_task = assigned_task.task.executorConfig.data
-  try:
-    json_blob = json.loads(thermos_task)
-  except (TypeError, ValueError):
-    return None
-  if 'instance' in json_blob:
-    # This is a MesosTaskInstance so we cannot get a MesosJob from this assigned_task
-    return None
-  return MesosJob.json_loads(thermos_task)
-
-
-def mesos_task_instance_from_assigned_task(assigned_task):
-  """Deserialize MesosTaskInstance from an AssignedTask thrift."""
-  thermos_task = assigned_task.task.executorConfig.data
-
-  if not thermos_task:
-    raise ValueError('Task did not have a thermos config!')
-
-  try:
-    json_blob = json.loads(thermos_task)
-  except (TypeError, ValueError) as e:
-    raise ValueError('Could not deserialize thermos config: %s' % e)
-
-  # As part of the transition for MESOS-2133, we can send either a MesosTaskInstance
-  # or we can be sending a MesosJob.  So handle both possible cases.  Once everyone
-  # is using MesosJob, then we can begin to leverage additional information that
-  # becomes available such as cluster.
-  if 'instance' in json_blob:
-    return MesosTaskInstance.json_loads(thermos_task)
-
-  # This is a MesosJob
-  mti, refs = task_instance_from_job(MesosJob.json_loads(thermos_task), assigned_task.instanceId)
-  for ref in refs:
-    # If the ref is {{thermos.task_id}} or a subscope of
-    # {{thermos.ports}}, it currently gets bound by the Thermos Runner,
-    # so we must leave them unbound.
-    #
-    # {{thermos.user}} is a legacy binding which we can safely ignore.
-    #
-    # TODO(wickman) These should be rewritten by the mesos client to use
-    # %%style%% replacements in order to allow us to better type-check configs
-    # client-side.
-    if ref == Ref.from_address('thermos.task_id'):
-      continue
-    if Ref.subscope(Ref.from_address('thermos.ports'), ref):
-      continue
-    if ref == Ref.from_address('thermos.user'):
-      continue
-    raise ValueError('Unexpected unbound refs: %s' % ' '.join(map(str, refs)))
-  return mti
-
-
-def resolve_ports(mesos_task, portmap):
-  """Given a MesosTaskInstance and the portmap of resolved ports from the scheduler,
-     create a fully resolved map of port name => port number for the thermos
-     runner and discovery manager."""
-  task_portmap = mesos_task.announce().portmap().get() if mesos_task.has_announce() else {}
-  task_portmap.update(portmap)
-  task_portmap = PortResolver.resolve(task_portmap)
-
-  for name, port in task_portmap.items():
-    if not isinstance(port, int):
-      log.warning('Task has unmapped port: %s => %s' % (name, port))
-
-  return dict((name, port) for (name, port) in task_portmap.items() if isinstance(port, int))

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/executor/common/task_runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/executor/common/task_runner.py b/src/main/python/twitter/aurora/executor/common/task_runner.py
deleted file mode 100644
index c7a57cb..0000000
--- a/src/main/python/twitter/aurora/executor/common/task_runner.py
+++ /dev/null
@@ -1,20 +0,0 @@
-from abc import abstractmethod
-
-from twitter.common.lang import Interface
-
-from .status_checker import StatusChecker
-
-
-class TaskError(Exception):
-  pass
-
-
-class TaskRunner(StatusChecker):
-  # For now, TaskRunner should just maintain the StatusChecker API.
-  pass
-
-
-class TaskRunnerProvider(Interface):
-  @abstractmethod
-  def from_assigned_task(self, assigned_task, sandbox):
-    pass

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/executor/executor_base.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/executor/executor_base.py b/src/main/python/twitter/aurora/executor/executor_base.py
deleted file mode 100644
index 4fd9aa4..0000000
--- a/src/main/python/twitter/aurora/executor/executor_base.py
+++ /dev/null
@@ -1,103 +0,0 @@
-from twitter.common import log
-
-from gen.twitter.aurora.constants import TERMINAL_STATES as AURORA_TERMINAL_STATES
-from gen.twitter.aurora.ttypes import ScheduleStatus
-from gen.twitter.thermos.ttypes import TaskState
-
-import mesos
-import mesos_pb2 as mesos_pb
-
-
-class ThermosExecutorBase(mesos.Executor):
-  # Statuses are hard, let's go shopping.
-  MESOS_STATES = {
-      mesos_pb.TASK_STARTING: 'STARTING',
-      mesos_pb.TASK_RUNNING: 'RUNNING',
-      mesos_pb.TASK_FINISHED: 'FINISHED',
-      mesos_pb.TASK_FAILED: 'FAILED',
-      mesos_pb.TASK_KILLED: 'KILLED',
-      mesos_pb.TASK_LOST: 'LOST',
-  }
-
-  THERMOS_TO_MESOS_STATES = {
-      TaskState.ACTIVE: mesos_pb.TASK_RUNNING,
-      TaskState.SUCCESS: mesos_pb.TASK_FINISHED,
-      TaskState.FAILED: mesos_pb.TASK_FAILED,
-      TaskState.KILLED: mesos_pb.TASK_KILLED,
-      TaskState.LOST: mesos_pb.TASK_LOST,
-  }
-
-  THERMOS_TO_TWITTER_STATES = {
-      TaskState.ACTIVE: ScheduleStatus.RUNNING,
-      TaskState.CLEANING: ScheduleStatus.RUNNING,
-      TaskState.FINALIZING: ScheduleStatus.RUNNING,
-      TaskState.SUCCESS: ScheduleStatus.FINISHED,
-      TaskState.FAILED: ScheduleStatus.FAILED,
-      TaskState.KILLED: ScheduleStatus.KILLED,
-      TaskState.LOST: ScheduleStatus.LOST,
-  }
-
-  @staticmethod
-  def twitter_status_is_terminal(status):
-    return status in AURORA_TERMINAL_STATES
-
-  @staticmethod
-  def mesos_status_is_terminal(status):
-    return status in (
-        mesos_pb.TASK_FAILED,
-        mesos_pb.TASK_FINISHED,
-        mesos_pb.TASK_KILLED,
-        mesos_pb.TASK_LOST,
-    )
-
-  @staticmethod
-  def thermos_status_is_terminal(status):
-    return status in (
-        TaskState.FAILED,
-        TaskState.KILLED,
-        TaskState.LOST,
-        TaskState.SUCCESS,
-    )
-
-  def __init__(self):
-    self._slave_id = None
-
-  def log(self, msg):
-    log.info('Executor [%s]: %s' % (self._slave_id, msg))
-
-  def registered(self, driver, executor_info, framework_info, slave_info):
-    self.log('registered() called with:')
-    self.log('   ExecutorInfo:  %s' % executor_info)
-    self.log('   FrameworkInfo: %s' % framework_info)
-    self.log('   SlaveInfo:     %s' % slave_info)
-    self._driver = driver
-    self._executor_info = executor_info
-    self._framework_info = framework_info
-    self._slave_info = slave_info
-
-  def reregistered(self, driver, slave_info):
-    self.log('reregistered() called with:')
-    self.log('   SlaveInfo:     %s' % slave_info)
-
-  def disconnected(self, driver):
-    self.log('disconnected() called')
-
-  def send_update(self, driver, task_id, state, message=None):
-    update = mesos_pb.TaskStatus()
-    if not isinstance(state, int):
-      raise TypeError('Invalid state type %s, should be int.' % type(state))
-    if state not in self.MESOS_STATES:
-      raise ValueError('Invalid state: %s' % state)
-    update.state = state
-    update.task_id.value = task_id
-    if message:
-      update.message = str(message)
-    self.log('Updating %s => %s' % (task_id, self.MESOS_STATES[state]))
-    self.log('   Reason: %s' % message)
-    driver.sendStatusUpdate(update)
-
-  def frameworkMessage(self, driver, message):
-    self.log('frameworkMessage() got message: %s, ignoring.' % message)
-
-  def error(self, driver, message):
-    self.log('Received error message: %s' % message)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/executor/executor_detector.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/executor/executor_detector.py b/src/main/python/twitter/aurora/executor/executor_detector.py
deleted file mode 100644
index c26c00e..0000000
--- a/src/main/python/twitter/aurora/executor/executor_detector.py
+++ /dev/null
@@ -1,77 +0,0 @@
-import os
-from glob import glob
-
-from twitter.common.string import ScanfParser
-
-
-# TODO(wickman) MESOS-2805  This makes an assumption about the directory
-# layout Mesos provides.  Ideally we never need to do this and we should
-# work with the Mesos core team to make it unnecessary.
-class ExecutorDetector(object):
-  class Error(Exception): pass
-  class CannotFindRoot(Error): pass
-
-  LOG_PATH = 'executor_logs'
-  RESOURCE_PATH = 'resource_usage.recordio'
-  VARS_PATH = 'executor_vars.json'
-  PATTERN = [
-      '%(root)s',
-      'slaves',
-      '%(slave_id)s',
-      'frameworks',
-      '%(framework_id)s',
-      'executors',
-      '%(executor_id)s',
-      'runs',
-      '%(run)s']
-  EXTRACTOR = ScanfParser(os.path.join(*PATTERN))
-
-  @classmethod
-  def find_root(cls, path):
-    """Does this path appear to match the executor directory pattern?"""
-
-    def root_from_path(path):
-      path = os.path.normpath(path)
-      path_vector = path.split(os.path.sep)
-      pattern_vector = cls.PATTERN
-      if len(path_vector) < len(pattern_vector):
-        return None
-      for pattern, path_component in zip(reversed(pattern_vector), reversed(path_vector)):
-        if pattern.startswith('%'):
-          continue
-        if path_component != pattern:
-          return None
-      matched_path = os.path.join(*path_vector[-len(pattern_vector) + 1:])
-      return os.path.normpath(path[:-len(matched_path)])
-
-    while path != os.path.dirname(path):
-      root = root_from_path(path)
-      if root:
-        return root
-      path = os.path.dirname(path)
-
-  @classmethod
-  def match(cls, path):
-    try:
-      return cls.EXTRACTOR.parse(path)
-    except ScanfParser.ParseError:
-      return None
-
-  @classmethod
-  def path(cls, result):
-    return os.path.join(*cls.PATTERN) % result.groups()
-
-  @classmethod
-  def find(cls, root, slave_id='*', framework_id='*', executor_id='*', run='*'):
-    mixins = dict(
-        root=root, slave_id=slave_id, framework_id=framework_id, executor_id=executor_id, run=run)
-    return filter(None, map(cls.match, glob(os.path.join(*cls.PATTERN) % mixins)))
-
-  def __init__(self, root=None):
-    self.root = root or self.find_root(os.getcwd())
-    if self.root is None:
-      raise self.CannotFindRoot('Not a valid executor root!')
-
-  def __iter__(self):
-    for extraction in self.find(root=self.root):
-      yield extraction

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/executor/executor_vars.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/executor/executor_vars.py b/src/main/python/twitter/aurora/executor/executor_vars.py
deleted file mode 100644
index 7f54d8f..0000000
--- a/src/main/python/twitter/aurora/executor/executor_vars.py
+++ /dev/null
@@ -1,123 +0,0 @@
-import os
-import time
-
-from twitter.common.exceptions import ExceptionalThread
-from twitter.common.metrics import (
-    LambdaGauge,
-    MutatorGauge,
-    NamedGauge,
-    Observable)
-from twitter.common.python.dirwrapper import PythonDirectoryWrapper
-from twitter.common.python.pex import PexInfo
-from twitter.common.quantity import Amount, Time
-from twitter.common.string.scanf import ScanfParser
-
-import psutil
-
-
-class ExecutorVars(Observable, ExceptionalThread):
-  """
-    Executor exported /vars wrapper.
-
-    Currently writes to disk and communicates through the Aurora Observer,
-    pending MESOS-433.
-  """
-  MUTATOR_METRICS = ('rss', 'cpu', 'thermos_pss', 'thermos_cpu')
-  RELEASE_TAG_FORMAT = ScanfParser('%(project)s_R%(release)d')
-  DEPLOY_TAG_FORMAT = ScanfParser('%(project)s_%(environment)s_%(release)d_R%(deploy)d')
-  PROJECT_NAMES = ('thermos', 'thermos_executor')
-  COLLECTION_INTERVAL = Amount(1, Time.MINUTES)
-
-  @classmethod
-  def get_release_from_tag(cls, tag):
-    def parse_from(parser):
-      try:
-        scanf = parser.parse(tag)
-        if scanf and scanf.project in cls.PROJECT_NAMES:
-          return scanf.release
-      except ScanfParser.ParseError:
-        pass
-    release = parse_from(cls.RELEASE_TAG_FORMAT)
-    if release is None:
-      release = parse_from(cls.DEPLOY_TAG_FORMAT)
-    if release is None:
-      release = 'UNKNOWN'
-    return release
-
-  @classmethod
-  def get_release_from_binary(cls, binary):
-    try:
-      pex_info = PexInfo.from_pex(PythonDirectoryWrapper.get(binary))
-      return cls.get_release_from_tag(pex_info.build_properties.get('tag', ''))
-    except PythonDirectoryWrapper.Error:
-      return 'UNKNOWN'
-
-  def __init__(self, clock=time):
-    self._clock = clock
-    self._self = psutil.Process(os.getpid())
-    if hasattr(self._self, 'getcwd'):
-      self._version = self.get_release_from_binary(
-        os.path.join(self._self.getcwd(), self._self.cmdline[1]))
-    else:
-      self._version = 'UNKNOWN'
-    self.metrics.register(NamedGauge('version', self._version))
-    self._orphan = False
-    self.metrics.register(LambdaGauge('orphan', lambda: int(self._orphan)))
-    self._metrics = dict((metric, MutatorGauge(metric, 0)) for metric in self.MUTATOR_METRICS)
-    for metric in self._metrics.values():
-      self.metrics.register(metric)
-    ExceptionalThread.__init__(self)
-    self.daemon = True
-
-  def write_metric(self, metric, value):
-    self._metrics[metric].write(value)
-
-  @classmethod
-  def thermos_children(cls, parent):
-    try:
-      for child in parent.get_children():
-        yield child  # thermos_runner
-        try:
-          for grandchild in child.get_children():
-            yield grandchild  # thermos_coordinator
-        except psutil.Error:
-          continue
-    except psutil.Error:
-      return
-
-  @classmethod
-  def aggregate_memory(cls, process, attribute='pss'):
-    try:
-      return sum(getattr(mmap, attribute) for mmap in process.get_memory_maps())
-    except (psutil.Error, AttributeError):
-      # psutil on OS X does not support get_memory_maps
-      return 0
-
-  @classmethod
-  def cpu_rss_pss(cls, process):
-    return (process.get_cpu_percent(0),
-            process.get_memory_info().rss,
-            cls.aggregate_memory(process, attribute='pss'))
-
-  def run(self):
-    while True:
-      self._clock.sleep(self.COLLECTION_INTERVAL.as_(Time.SECONDS))
-      self.sample()
-
-  def sample(self):
-    try:
-      executor_cpu, executor_rss, _ = self.cpu_rss_pss(self._self)
-      self.write_metric('cpu', executor_cpu)
-      self.write_metric('rss', executor_rss)
-      self._orphan = self._self.ppid == 1
-    except psutil.Error:
-      return False
-
-    try:
-      child_stats = map(self.cpu_rss_pss, self.thermos_children(self._self))
-      self.write_metric('thermos_cpu', sum(stat[0] for stat in child_stats))
-      self.write_metric('thermos_pss', sum(stat[2] for stat in child_stats))
-    except psutil.Error:
-      pass
-
-    return True

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/executor/gc_executor.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/executor/gc_executor.py b/src/main/python/twitter/aurora/executor/gc_executor.py
deleted file mode 100644
index f17d0a2..0000000
--- a/src/main/python/twitter/aurora/executor/gc_executor.py
+++ /dev/null
@@ -1,499 +0,0 @@
-"""Thermos garbage-collection (GC) executor
-
-This module containts the Thermos GC executor, responsible for garbage collecting old tasks and
-reconciling task states with the Mesos scheduler. It is intended to be run periodically on Mesos
-slaves utilising the Thermos executor.
-
-"""
-
-import os
-import threading
-import time
-
-from twitter.common.collections import OrderedDict
-from twitter.common.exceptions import ExceptionalThread
-from twitter.common.metrics import Observable
-from twitter.common.metrics.gauge import AtomicGauge
-from twitter.common.quantity import Amount, Time
-from twitter.thermos.common.ckpt import CheckpointDispatcher
-from twitter.thermos.common.path import TaskPath
-from twitter.thermos.core.inspector import CheckpointInspector
-from twitter.thermos.core.helper import TaskKiller
-from twitter.thermos.monitoring.detector import TaskDetector
-from twitter.thermos.monitoring.garbage import TaskGarbageCollector
-
-from gen.twitter.aurora.comm.ttypes import (
-    AdjustRetainedTasks,
-    DeletedTasks,
-    SchedulerMessage)
-from gen.twitter.aurora.ttypes import ScheduleStatus
-
-from .common.sandbox import DirectorySandbox, SandboxInterface
-from .executor_base import ThermosExecutorBase
-from .executor_detector import ExecutorDetector
-
-import mesos_pb2 as mesos_pb
-import psutil
-from thrift.TSerialization import deserialize as thrift_deserialize
-from thrift.TSerialization import serialize as thrift_serialize
-
-
-class ThermosGCExecutor(ThermosExecutorBase, ExceptionalThread, Observable):
-  """
-    Thermos GC Executor, responsible for:
-      - garbage collecting old tasks to make sure they don't clutter up the system
-      - state reconciliation with the scheduler (in case it thinks we're running
-        something we're not or vice versa.)
-  """
-  MAX_PID_TIME_DRIFT = Amount(10, Time.SECONDS)
-  MAX_CHECKPOINT_TIME_DRIFT = Amount(1, Time.HOURS)  # maximum runner disconnection time
-
-  # how old a task must be before we're willing to kill it, assuming that there could be
-  # slight races in the following scenario:
-  #    launch gc with retained_tasks={t1, t2, t3}
-  #    launch task t4
-  MINIMUM_KILL_AGE = Amount(10, Time.MINUTES)
-
-  # wait time between checking for new GC events from the slave and/or cleaning orphaned tasks
-  POLL_WAIT = Amount(5, Time.MINUTES)
-
-  # maximum amount of time the executor will wait with no tasks before it exits.
-  MAXIMUM_EXECUTOR_WAIT = Amount(15, Time.MINUTES)
-
-  # maximum lifetime of this executor.  this is to prevent older GC executor binaries from
-  # running forever
-  MAXIMUM_EXECUTOR_LIFETIME = Amount(1, Time.DAYS)
-
-  PERSISTENCE_WAIT = Amount(5, Time.SECONDS)
-
-  def __init__(self,
-               checkpoint_root,
-               verbose=True,
-               task_killer=TaskKiller,
-               executor_detector=ExecutorDetector,
-               task_garbage_collector=TaskGarbageCollector,
-               clock=time):
-    ThermosExecutorBase.__init__(self)
-    ExceptionalThread.__init__(self)
-    self.daemon = True
-    self._stop_event = threading.Event()
-    self._gc_task_queue = OrderedDict() # mapping of task_id => (TaskInfo, AdjustRetainedTasks), in
-                                        # the order in which they were received via a launchTask.
-    self._driver = None   # cache the ExecutorDriver provided by the slave, so we can use it
-                          # out of band from slave-initiated callbacks. This should be supplied by
-                          # ThermosExecutorBase.registered() when the executor first registers with
-                          # the slave.
-    self._slave_id = None # cache the slave ID provided by the slave
-    self._task_id = None  # the task_id currently being executed by the ThermosGCExecutor, if any
-    self._start_time = None # the start time of a task currently being executed, if any
-    self._detector = executor_detector()
-    self._collector = task_garbage_collector(root=checkpoint_root)
-    self._clock = clock
-    self._task_killer = task_killer
-    self._checkpoint_root = checkpoint_root
-    self._dropped_tasks = AtomicGauge('dropped_tasks')
-    self.metrics.register(self._dropped_tasks)
-
-  def _runner_ckpt(self, task_id):
-    """Return the runner checkpoint file for a given task_id."""
-    return TaskPath(root=self._checkpoint_root, task_id=task_id).getpath('runner_checkpoint')
-
-  def _terminate_task(self, task_id, kill=True):
-    """Terminate a task using the associated task killer. Returns a boolean indicating success."""
-    killer = self._task_killer(task_id, self._checkpoint_root)
-    self.log('Terminating %s...' % task_id)
-    runner_terminate = killer.kill if kill else killer.lose
-    try:
-      runner_terminate(force=True)
-      return True
-    except Exception as e:
-      self.log('Could not terminate: %s' % e)
-      return False
-
-  def partition_tasks(self):
-    """Return active/finished tasks as discovered from the checkpoint root."""
-    detector = TaskDetector(root=self._checkpoint_root)
-    active_tasks = set(t_id for _, t_id in detector.get_task_ids(state='active'))
-    finished_tasks = set(t_id for _, t_id in detector.get_task_ids(state='finished'))
-    return active_tasks, finished_tasks
-
-  def get_states(self, task_id):
-    """Returns the (timestamp, status) tuples of the task or [] if could not replay."""
-    statuses = CheckpointDispatcher.iter_statuses(self._runner_ckpt(task_id))
-    try:
-      return [(state.timestamp_ms / 1000.0, state.state) for state in statuses]
-    except CheckpointDispatcher.ErrorRecoveringState:
-      return []
-
-  def get_sandbox(self, task_id):
-    """Returns the sandbox of the task, or None if it has not yet been initialized."""
-    try:
-      for update in CheckpointDispatcher.iter_updates(self._runner_ckpt(task_id)):
-        if update.runner_header and update.runner_header.sandbox:
-          return update.runner_header.sandbox
-    except CheckpointDispatcher.ErrorRecoveringState:
-      return None
-
-  def maybe_terminate_unknown_task(self, task_id):
-    """Terminate a task if we believe the scheduler doesn't know about it.
-
-       It's possible for the scheduler to queue a GC and launch a task afterwards, in which
-       case we may see actively running tasks that the scheduler did not report in the
-       AdjustRetainedTasks message.
-
-       Returns:
-         boolean indicating whether the task was terminated
-    """
-    states = self.get_states(task_id)
-    if states:
-      task_start_time, _ = states[0]
-      if self._start_time - task_start_time > self.MINIMUM_KILL_AGE.as_(Time.SECONDS):
-        return self._terminate_task(task_id)
-    return False
-
-  def should_gc_task(self, task_id):
-    """Check if a possibly-corrupt task should be locally GCed
-
-      A task should be GCed if its checkpoint stream appears to be corrupted and the kill age
-      threshold is exceeded.
-
-       Returns:
-         set, containing the task_id if it should be marked for local GC, or empty otherwise
-    """
-    runner_ckpt = self._runner_ckpt(task_id)
-    if not os.path.exists(runner_ckpt):
-      return set()
-    latest_update = os.path.getmtime(runner_ckpt)
-    if self._start_time - latest_update > self.MINIMUM_KILL_AGE.as_(Time.SECONDS):
-      self.log('Got corrupt checkpoint file for %s - marking for local GC' % task_id)
-      return set([task_id])
-    else:
-      self.log('Checkpoint file unreadable, but not yet beyond MINIMUM_KILL_AGE threshold')
-      return set()
-
-  def reconcile_states(self, driver, retained_tasks):
-    """Reconcile states that the scheduler thinks tasks are in vs what they really are in.
-
-        Local    vs   Scheduler  => Action
-       ===================================
-        ACTIVE         ACTIVE    => no-op
-        ACTIVE        STARTING   => no-op
-        ACTIVE        TERMINAL   => maybe kill task*
-        ACTIVE        !EXISTS    => maybe kill task*
-       TERMINAL        ACTIVE    => send actual status**
-       TERMINAL       STARTING   => send actual status**
-       TERMINAL       TERMINAL   => no-op
-       TERMINAL       !EXISTS    => gc locally
-       !EXISTS         ACTIVE    => send LOST**
-       !EXISTS        STARTING   => no-op
-       !EXISTS        TERMINAL   => gc remotely
-
-       * - Only kill if this does not appear to be a race condition.
-       ** - These appear to have no effect
-
-       Side effecting operations:
-         ACTIVE   | (TERMINAL / !EXISTS) => maybe kill
-         TERMINAL | !EXISTS              => delete
-         !EXISTS  | TERMINAL             => delete
-
-      Returns tuple of (local_gc, remote_gc, updates), where:
-        local_gc - set of task_ids to be GCed locally
-        remote_gc - set of task_ids to be deleted on the scheduler
-        updates - dictionary of updates sent to the scheduler (task_id: ScheduleStatus)
-    """
-    def partition(rt):
-      active, starting, finished = set(), set(), set()
-      for task_id, schedule_status in rt.items():
-        if self.twitter_status_is_terminal(schedule_status):
-          finished.add(task_id)
-        elif (schedule_status == ScheduleStatus.STARTING or
-              schedule_status == ScheduleStatus.ASSIGNED):
-          starting.add(task_id)
-        else:
-          active.add(task_id)
-      return active, starting, finished
-
-    local_active, local_finished = self.partition_tasks()
-    sched_active, sched_starting, sched_finished = partition(retained_tasks)
-    local_task_ids = local_active | local_finished
-    sched_task_ids = sched_active | sched_starting | sched_finished
-    all_task_ids = local_task_ids | sched_task_ids
-
-    self.log('Told to retain the following task ids:')
-    for task_id, schedule_status in retained_tasks.items():
-      self.log('  => %s as %s' % (task_id, ScheduleStatus._VALUES_TO_NAMES[schedule_status]))
-
-    self.log('Local active tasks:')
-    for task_id in local_active:
-      self.log('  => %s' % task_id)
-
-    self.log('Local finished tasks:')
-    for task_id in local_finished:
-      self.log('  => %s' % task_id)
-
-    local_gc, remote_gc = set(), set()
-    updates = {}
-
-    for task_id in all_task_ids:
-      if task_id in local_active and task_id not in (sched_active | sched_starting):
-        self.log('Inspecting task %s for termination.' % task_id)
-        if not self.maybe_terminate_unknown_task(task_id):
-          local_gc.update(self.should_gc_task(task_id))
-      if task_id in local_finished and task_id not in sched_task_ids:
-        self.log('Queueing task %s for local deletion.' % task_id)
-        local_gc.add(task_id)
-      if task_id in local_finished and task_id in (sched_active | sched_starting):
-        self.log('Task %s finished but scheduler thinks active/starting.' % task_id)
-        states = self.get_states(task_id)
-        if states:
-          _, last_state = states[-1]
-          updates[task_id] = self.THERMOS_TO_TWITTER_STATES.get(last_state, ScheduleStatus.UNKNOWN)
-          self.send_update(
-              driver,
-              task_id,
-              self.THERMOS_TO_MESOS_STATES.get(last_state, mesos_pb.TASK_LOST),
-              'Task finish detected by GC executor.')
-        else:
-          local_gc.update(self.should_gc_task(task_id))
-      if task_id in sched_finished and task_id not in local_task_ids:
-        self.log('Queueing task %s for remote deletion.' % task_id)
-        remote_gc.add(task_id)
-      if task_id not in local_task_ids and task_id in sched_active:
-        self.log('Know nothing about task %s, telling scheduler of LOSS.' % task_id)
-        updates[task_id] = ScheduleStatus.LOST
-        self.send_update(driver, task_id, mesos_pb.TASK_LOST, 'GC executor found no trace of task.')
-      if task_id not in local_task_ids and task_id in sched_starting:
-        self.log('Know nothing about task %s, but scheduler says STARTING - passing' % task_id)
-
-    return local_gc, remote_gc, updates
-
-  def clean_orphans(self, driver):
-    """Inspect checkpoints for trees that have been kill -9'ed but not properly cleaned up."""
-    self.log('Checking for orphaned tasks')
-    active_tasks, _ = self.partition_tasks()
-    updates = {}
-
-    inspector = CheckpointInspector(self._checkpoint_root)
-
-    def is_our_process(process, uid, timestamp):
-      if process.uids.real != uid:
-        return False
-      estimated_start_time = self._clock.time() - process.create_time
-      return abs(timestamp - estimated_start_time) < self.MAX_PID_TIME_DRIFT.as_(Time.SECONDS)
-
-    for task_id in active_tasks:
-      self.log('Inspecting running task: %s' % task_id)
-      inspection = inspector.inspect(task_id)
-      if not inspection:
-        self.log('  - Error inspecting task runner')
-        continue
-      latest_runner = inspection.runner_processes[-1]
-      # Assume that it has not yet started?
-      if not latest_runner:
-        self.log('  - Task has no registered runners.')
-        continue
-      runner_pid, runner_uid, timestamp_ms = latest_runner
-      try:
-        runner_process = psutil.Process(runner_pid)
-        if is_our_process(runner_process, runner_uid, timestamp_ms / 1000.0):
-          self.log('  - Runner appears healthy.')
-          continue
-      except psutil.NoSuchProcess:
-        # Runner is dead
-        pass
-      except psutil.Error as err:
-        self.log('  - Error sampling runner process [pid=%s]: %s' % (runner_pid, err))
-        continue
-      try:
-        latest_update = os.path.getmtime(self._runner_ckpt(task_id))
-      except (IOError, OSError) as err:
-        self.log('  - Error accessing runner ckpt: %s' % err)
-        continue
-      if self._clock.time() - latest_update < self.MAX_CHECKPOINT_TIME_DRIFT.as_(Time.SECONDS):
-        self.log('  - Runner is dead but under LOST threshold.')
-        continue
-      self.log('  - Runner is dead but beyond LOST threshold: %.1fs' % (
-          self._clock.time() - latest_update))
-      if self._terminate_task(task_id, kill=False):
-        updates[task_id] = ScheduleStatus.LOST
-        self.send_update(
-            driver, task_id, mesos_pb.TASK_LOST, 'GC executor detected failed task runner.')
-
-    return updates
-
-  def _erase_sandbox(self, task_id):
-    # TODO(wickman) Only mesos should be in the business of garbage collecting sandboxes.
-    header_sandbox = self.get_sandbox(task_id)
-    directory_sandbox = DirectorySandbox(header_sandbox) if header_sandbox else None
-    if directory_sandbox and directory_sandbox.exists():
-      self.log('Destroying DirectorySandbox for %s' % task_id)
-      directory_sandbox.destroy()
-    else:
-      self.log('Found no sandboxes for %s' % task_id)
-
-  def _gc(self, task_id):
-    """Erase the sandbox, logs and metadata of the given task."""
-    self.log('Erasing sandbox for %s' % task_id)
-    self._erase_sandbox(task_id)
-    self.log('Erasing logs for %s' % task_id)
-    self._collector.erase_logs(task_id)
-    self.log('Erasing metadata for %s' % task_id)
-    self._collector.erase_metadata(task_id)
-
-  def garbage_collect(self, force_delete=frozenset()):
-    """Garbage collect tasks on the system no longer active or in the supplied force_delete.
-
-    Return a set of task_ids representing the tasks that were garbage collected.
-    """
-    active_tasks, finished_tasks = self.partition_tasks()
-    retained_executors = set(iter(self.linked_executors))
-    self.log('Executor sandboxes retained by Mesos:')
-    if retained_executors:
-      for r_e in sorted(retained_executors):
-        self.log('  %s' % r_e)
-    else:
-      self.log('  None')
-    for task_id in (active_tasks - retained_executors):
-      self.log('ERROR: Active task %s had its executor sandbox pulled.' % task_id)
-    gc_tasks = (finished_tasks - retained_executors) | force_delete
-    for gc_task in gc_tasks:
-      self._gc(gc_task)
-    return gc_tasks
-
-  @property
-  def linked_executors(self):
-    """Generator yielding the executor sandboxes detected on the system."""
-    thermos_executor_prefix = 'thermos-'
-    for executor in self._detector:
-      # It's possible for just the 'latest' symlink to be present but no run directories.
-      # This indicates that the task has been fully garbage collected.
-      if executor.executor_id.startswith(thermos_executor_prefix) and executor.run != 'latest':
-        yield executor.executor_id[len(thermos_executor_prefix):]
-
-  def _run_gc(self, task, retain_tasks, retain_start):
-    """
-      Reconcile the set of tasks to retain (provided by the scheduler) with the current state of
-      executors on this system. Garbage collect tasks/executors as appropriate.
-
-      Not re-entrant! Previous executions must complete (and clear self._task_id) before this can be
-      invoked.
-
-      Potentially blocking (e.g. on I/O) in self.garbage_collect()
-
-      Args:
-        task: TaskInfo provided by the slave
-        retain_tasks: mapping of task_id => ScheduleStatus, describing what the scheduler thinks is
-                      running on this system
-        retain_start: the time at which the retain_tasks message is effective -- this means that
-                      tasks started after the retain_tasks message is effective are skipped
-                      until future GC runs.
-    """
-    task_id = task.task_id.value
-    if self._task_id is not None:
-      raise RuntimeError('_run_gc() called [task_id=%s], but already running [task_id=%s]'
-                         % (task_id, self._task_id))
-    self._task_id = task_id
-    self.log('Launching garbage collection [task_id=%s]' % task_id)
-    self._start_time = retain_start
-    local_gc, remote_gc, _ = self.reconcile_states(self._driver, retain_tasks)
-    deleted_tasks = set(retain_tasks).intersection(self.garbage_collect(local_gc)) | remote_gc
-    if deleted_tasks:
-      self._driver.sendFrameworkMessage(thrift_serialize(
-          SchedulerMessage(deletedTasks=DeletedTasks(taskIds=deleted_tasks))))
-    self.send_update(
-        self._driver, task.task_id.value, mesos_pb.TASK_FINISHED, 'Garbage collection finished.')
-    self.log('Garbage collection complete [task_id=%s]' % task_id)
-    self._task_id = self._start_time = None
-
-  def run(self):
-    """Main GC executor event loop.
-
-      Periodically perform state reconciliation with the set of tasks provided
-      by the slave, and garbage collect orphaned tasks on the system.
-    """
-    run_start = self._clock.time()
-    last_gc_run = self._clock.time()
-
-    def should_terminate():
-      now = self._clock.time()
-      if now > run_start + self.MAXIMUM_EXECUTOR_LIFETIME.as_(Time.SECONDS):
-        return True
-      if now > last_gc_run + self.MAXIMUM_EXECUTOR_WAIT.as_(Time.SECONDS):
-        return True
-      return self._stop_event.is_set()
-
-    while not should_terminate():
-      try:
-        _, (task, retain_tasks, retain_start) = self._gc_task_queue.popitem(0)
-        last_gc_run = retain_start
-        self._run_gc(task, retain_tasks, retain_start)
-      except KeyError: # no enqueued GC tasks
-        pass
-      if self._driver is not None:
-        self.clean_orphans(self._driver)
-      self._stop_event.wait(self.POLL_WAIT.as_(Time.SECONDS))
-
-    # shutdown
-    if self._driver is not None:
-      try:
-        prev_task_id, _ = self._gc_task_queue.popitem(0)
-      except KeyError: # no enqueued GC tasks
-        pass
-      else:
-        self.send_update(self._driver, prev_task_id, mesos_pb.TASK_FINISHED,
-                         'Garbage collection skipped - GC executor shutting down')
-        # TODO(jon) Remove this once external MESOS-243 is resolved.
-        self.log('Sleeping briefly to mitigate https://issues.apache.org/jira/browse/MESOS-243')
-        self._clock.sleep(self.PERSISTENCE_WAIT.as_(Time.SECONDS))
-
-      self._driver.stop()
-
-  """ Mesos Executor API methods follow """
-
-  def launchTask(self, driver, task):
-    """Queue a new garbage collection run, and drop any currently-enqueued runs."""
-    if self._slave_id is None:
-      self._slave_id = task.slave_id.value
-    task_id = task.task_id.value
-    self.log('launchTask() got task_id: %s' % task_id)
-    if task_id == self._task_id:
-      self.log('=> GC with task_id %s currently running - ignoring' % task_id)
-      return
-    elif task_id in self._gc_task_queue:
-      self.log('=> Already have task_id %s queued - ignoring' % task_id)
-      return
-    try:
-      art = thrift_deserialize(AdjustRetainedTasks(), task.data)
-    except Exception as err:
-      self.log('Error deserializing task: %s' % err)
-      self.send_update(
-          self._driver, task_id, mesos_pb.TASK_FAILED, 'Deserialization of GC task failed')
-      return
-    try:
-      prev_task_id, _ = self._gc_task_queue.popitem(0)
-    except KeyError: # no enqueued GC tasks - reset counter
-      self._dropped_tasks.write(0)
-    else:
-      self.log('=> Dropping previously queued GC with task_id %s' % prev_task_id)
-      self._dropped_tasks.increment()
-      self.log('=> Updating scheduler')
-      self.send_update(self._driver, prev_task_id, mesos_pb.TASK_FINISHED,
-                       'Garbage collection skipped - GC executor received another task')
-    self.log('=> Adding %s to GC queue' % task_id)
-    self._gc_task_queue[task_id] = (task, art.retainedTasks, self._clock.time())
-
-  def killTask(self, driver, task_id):
-    """Remove the specified task from the queue, if it's not yet run. Otherwise, no-op."""
-    self.log('killTask() got task_id: %s' % task_id)
-    task = self._gc_task_queue.pop(task_id, None)
-    if task is not None:
-      self.log('=> Removed %s from queued GC tasks' % task_id)
-    elif task_id == self._task_id:
-      self.log('=> GC with task_id %s currently running - ignoring' % task_id)
-    else:
-      self.log('=> Unknown task_id %s - ignoring' % task_id)
-
-  def shutdown(self, driver):
-    """Trigger the Executor to shut down as soon as the current GC run is finished."""
-    self.log('shutdown() called - setting stop event')
-    self._stop_event.set()

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/executor/status_manager.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/executor/status_manager.py b/src/main/python/twitter/aurora/executor/status_manager.py
deleted file mode 100644
index c974597..0000000
--- a/src/main/python/twitter/aurora/executor/status_manager.py
+++ /dev/null
@@ -1,39 +0,0 @@
-import time
-
-from twitter.common import log
-from twitter.common.exceptions import ExceptionalThread
-from twitter.common.quantity import Amount, Time
-
-from .common.status_checker import StatusChecker
-
-
-class StatusManager(ExceptionalThread):
-  """
-    An agent that periodically checks the health of a task via StatusCheckers that
-    provide HTTP health checking, resource consumption, etc.
-
-    If any of the status interfaces return a status, the Status Manager
-    invokes the user-supplied callback with the status.
-  """
-  POLL_WAIT = Amount(500, Time.MILLISECONDS)
-
-  def __init__(self, status_checker, callback, clock=time):
-    if not isinstance(status_checker, StatusChecker):
-      raise TypeError('status_checker must be a StatusChecker, got %s' % type(status_checker))
-    if not callable(callback):
-      raise TypeError('callback needs to be callable!')
-    self._status_checker = status_checker
-    self._callback = callback
-    self._clock = clock
-    super(StatusManager, self).__init__()
-    self.daemon = True
-
-  def run(self):
-    while True:
-      status_result = self._status_checker.status
-      if status_result is not None:
-        log.info('Status manager got %s' % status_result)
-        self._callback(status_result)
-        break
-      else:
-        self._clock.sleep(self.POLL_WAIT.as_(Time.SECONDS))

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/executor/thermos_executor.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/executor/thermos_executor.py b/src/main/python/twitter/aurora/executor/thermos_executor.py
deleted file mode 100644
index 52b514a..0000000
--- a/src/main/python/twitter/aurora/executor/thermos_executor.py
+++ /dev/null
@@ -1,289 +0,0 @@
-import os
-import threading
-import time
-import traceback
-
-from twitter.common import log
-from twitter.common.concurrent import deadline, defer, Timeout
-from twitter.common.metrics import Observable
-from twitter.common.quantity import Amount, Time
-
-from .common.kill_manager import KillManager
-from .common.sandbox import DirectorySandbox, SandboxProvider
-from .common.status_checker import ChainedStatusChecker, ExitState
-from .common.task_info import (
-    assigned_task_from_mesos_task,
-    mesos_task_instance_from_assigned_task,
-    resolve_ports,
-)
-from .common.task_runner import (
-    TaskError,
-    TaskRunner,
-    TaskRunnerProvider,
-)
-from .executor_base import ThermosExecutorBase
-from .status_manager import StatusManager
-
-import mesos_pb2 as mesos_pb
-
-
-class DefaultSandboxProvider(SandboxProvider):
-  SANDBOX_NAME = 'sandbox'
-
-  def from_assigned_task(self, assigned_task):
-    mesos_task = mesos_task_instance_from_assigned_task(assigned_task)
-    return DirectorySandbox(
-        os.path.realpath(self.SANDBOX_NAME),
-        mesos_task.role().get())
-
-
-class ThermosExecutor(Observable, ThermosExecutorBase):
-  PERSISTENCE_WAIT = Amount(5, Time.SECONDS)
-  SANDBOX_INITIALIZATION_TIMEOUT = Amount(10, Time.MINUTES)
-  START_TIMEOUT = Amount(2, Time.MINUTES)
-  STOP_TIMEOUT = Amount(2, Time.MINUTES)
-  STOP_WAIT = Amount(5, Time.SECONDS)
-
-  def __init__(self,
-               runner_provider,
-               status_manager_class=StatusManager,
-               sandbox_provider=DefaultSandboxProvider,
-               status_providers=(),
-               clock=time):
-
-    ThermosExecutorBase.__init__(self)
-    if not isinstance(runner_provider, TaskRunnerProvider):
-      raise TypeError('runner_provider must be a TaskRunnerProvider, got %s' %
-          type(runner_provider))
-    self._runner = None
-    self._runner_provider = runner_provider
-    self._clock = clock
-    self._task_id = None
-    self._status_providers = status_providers
-    self._status_manager = None
-    self._status_manager_class = status_manager_class
-    self._sandbox = None
-    self._sandbox_provider = sandbox_provider()
-    self._kill_manager = KillManager()
-    # Events that are exposed for interested entities
-    self.runner_aborted = threading.Event()
-    self.runner_started = threading.Event()
-    self.sandbox_initialized = threading.Event()
-    self.sandbox_created = threading.Event()
-    self.terminated = threading.Event()
-    self.launched = threading.Event()
-
-  @property
-  def runner(self):
-    return self._runner
-
-  def _die(self, driver, status, msg):
-    log.fatal(msg)
-    self.send_update(driver, self._task_id, status, msg)
-    defer(driver.stop, delay=self.STOP_WAIT)
-
-  def _run(self, driver, assigned_task, mesos_task):
-    """
-      Commence running a Task.
-        - Initialize the sandbox
-        - Start the ThermosTaskRunner (fork the Thermos TaskRunner)
-        - Set up necessary HealthCheckers
-        - Set up DiscoveryManager, if applicable
-        - Set up ResourceCheckpointer
-        - Set up StatusManager, and attach HealthCheckers
-    """
-    self.send_update(driver, self._task_id, mesos_pb.TASK_STARTING, 'Initializing sandbox.')
-
-    if not self._initialize_sandbox(driver, assigned_task):
-      return
-
-    # Fully resolve the portmap
-    portmap = resolve_ports(mesos_task, assigned_task.assignedPorts)
-
-    # start the process on a separate thread and give the message processing thread back
-    # to the driver
-    try:
-      self._runner = self._runner_provider.from_assigned_task(assigned_task, self._sandbox)
-    except TaskError as e:
-      self._die(driver, mesos_pb.TASK_FAILED, str(e))
-      return
-
-    if not isinstance(self._runner, TaskRunner):
-      self._die(driver, mesos_pb.TASK_FAILED, 'Unrecognized task!')
-      return
-
-    if not self._start_runner(driver, assigned_task, mesos_task, portmap):
-      return
-
-    self.send_update(driver, self._task_id, mesos_pb.TASK_RUNNING)
-
-    self._start_status_manager(driver, assigned_task, mesos_task, portmap)
-
-  def _initialize_sandbox(self, driver, assigned_task):
-    self._sandbox = self._sandbox_provider.from_assigned_task(assigned_task)
-    self.sandbox_initialized.set()
-    try:
-      deadline(self._sandbox.create, timeout=self.SANDBOX_INITIALIZATION_TIMEOUT,
-               daemon=True, propagate=True)
-    except Timeout:
-      self._die(driver, mesos_pb.TASK_FAILED, 'Timed out waiting for sandbox to initialize!')
-      return
-    except self._sandbox.Error as e:
-      self._die(driver, mesos_pb.TASK_FAILED, 'Failed to initialize sandbox: %s' % e)
-      return
-    self.sandbox_created.set()
-    return True
-
-  def _start_runner(self, driver, assigned_task, mesos_task, portmap):
-    if self.runner_aborted.is_set():
-      self._die(driver, mesos_pb.TASK_KILLED, 'Task killed during initialization.')
-
-    try:
-      deadline(self._runner.start, timeout=self.START_TIMEOUT, propagate=True)
-    except TaskError as e:
-      self._die(driver, mesos_pb.TASK_FAILED, 'Task initialization failed: %s' % e)
-      return False
-    except Timeout:
-      self._die(driver, mesos_pb.TASK_LOST, 'Timed out waiting for task to start!')
-      return False
-
-    self.runner_started.set()
-    log.debug('Task started.')
-
-    return True
-
-  def _start_status_manager(self, driver, assigned_task, mesos_task, portmap):
-    status_checkers = [self._kill_manager]
-    self.metrics.register_observable('kill_manager', self._kill_manager)
-
-    for status_provider in self._status_providers:
-      status_checker = status_provider.from_assigned_task(assigned_task, self._sandbox)
-      if status_checker is None:
-        continue
-      status_checkers.append(status_checker)
-      # self.metrics.register_observable()
-
-    self._chained_checker = ChainedStatusChecker(status_checkers)
-    self._chained_checker.start()
-
-    # chain the runner to the other checkers, but do not chain .start()/.stop()
-    complete_checker = ChainedStatusChecker([self._runner, self._chained_checker])
-    self._status_manager = self._status_manager_class(
-        complete_checker, self._shutdown, clock=self._clock)
-    self._status_manager.start()
-
-  def _signal_kill_manager(self, driver, task_id, reason):
-    if self._task_id is None:
-      log.error('Was asked to kill task but no task running!')
-      return
-    if task_id != self._task_id:
-      log.error('Asked to kill a task other than what we are running!')
-      return
-    if not self.sandbox_created.is_set():
-      log.error('Asked to kill task with incomplete sandbox - aborting runner start')
-      self.runner_aborted.set()
-      return
-    self.log('Activating kill manager.')
-    self._kill_manager.kill(reason)
-
-  @classmethod
-  def translate_exit_state_to_mesos(cls, exit_state):
-    # Put into executor_base?
-    if exit_state == ExitState.FAILED:
-      return mesos_pb.TASK_FAILED
-    elif exit_state == ExitState.KILLED:
-      return mesos_pb.TASK_KILLED
-    elif exit_state == ExitState.FINISHED:
-      return mesos_pb.TASK_FINISHED
-    elif exit_state == ExitState.LOST:
-      return mesos_pb.TASK_LOST
-    log.error('Unknown exit state, defaulting to TASK_FINISHED.')
-    return mesos_pb.TASK_FINISHED
-
-  def _shutdown(self, status_result):
-    runner_status = self._runner.status
-
-    try:
-      deadline(self._runner.stop, timeout=self.STOP_TIMEOUT)
-    except Timeout:
-      log.error('Failed to stop runner within deadline.')
-
-    try:
-      deadline(self._chained_checker.stop, timeout=self.STOP_TIMEOUT)
-    except Timeout:
-      log.error('Failed to stop all checkers within deadline.')
-
-    # If the runner was alive when _shutdown was called, defer to the status_result,
-    # otherwise the runner's terminal state is the preferred state.
-    exit_status = runner_status or status_result
-
-    self.send_update(
-        self._driver,
-        self._task_id,
-        self.translate_exit_state_to_mesos(exit_status.status),
-        status_result.reason)
-
-    self.terminated.set()
-    defer(self._driver.stop, delay=self.PERSISTENCE_WAIT)
-
-  """ Mesos Executor API methods follow """
-
-  def launchTask(self, driver, task):
-    """
-      Invoked when a task has been launched on this executor (initiated via Scheduler::launchTasks).
-      Note that this task can be realized with a thread, a process, or some simple computation,
-      however, no other callbacks will be invoked on this executor until this callback has returned.
-    """
-    self.launched.set()
-    self.log('launchTask got task: %s:%s' % (task.name, task.task_id.value))
-
-    # TODO(wickman)  Update the tests to call registered(), then remove this line and issue
-    # an assert if self._driver is not populated.
-    self._driver = driver
-
-    if self._runner:
-      log.error('Already running a task! %s' % self._task_id)
-      self.send_update(driver, task.task_id.value, mesos_pb.TASK_LOST,
-          "Task already running on this executor: %s" % self._task_id)
-      return
-
-    self._slave_id = task.slave_id.value
-    self._task_id = task.task_id.value
-
-    try:
-      assigned_task = assigned_task_from_mesos_task(task)
-      mesos_task = mesos_task_instance_from_assigned_task(assigned_task)
-    except Exception as e:
-      log.fatal('Could not deserialize AssignedTask')
-      log.fatal(traceback.format_exc())
-      self.send_update(
-          driver, self._task_id, mesos_pb.TASK_FAILED, "Could not deserialize task: %s" % e)
-      defer(driver.stop, delay=self.STOP_WAIT)
-      return
-
-    defer(lambda: self._run(driver, assigned_task, mesos_task))
-
-  def killTask(self, driver, task_id):
-    """
-     Invoked when a task running within this executor has been killed (via
-     SchedulerDriver::killTask). Note that no status update will be sent on behalf of the executor,
-     the executor is responsible for creating a new TaskStatus (i.e., with TASK_KILLED) and invoking
-     ExecutorDriver::sendStatusUpdate.
-    """
-    self.log('killTask got task_id: %s' % task_id)
-    self._signal_kill_manager(driver, task_id.value, "Instructed to kill task.")
-    self.log('killTask returned.')
-
-  def shutdown(self, driver):
-    """
-     Invoked when the executor should terminate all of its currently running tasks. Note that after
-     Mesos has determined that an executor has terminated any tasks that the executor did not send
-     terminal status updates for (e.g., TASK_KILLED, TASK_FINISHED, TASK_FAILED, etc) a TASK_LOST
-     status update will be created.
-
-    """
-    self.log('shutdown called')
-    if self._task_id:
-      self.log('shutting down %s' % self._task_id)
-      self._signal_kill_manager(driver, self._task_id, "Told to shut down executor.")
-    self.log('shutdown returned')

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/executor/thermos_runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/executor/thermos_runner.py b/src/main/python/twitter/aurora/executor/thermos_runner.py
deleted file mode 100644
index 0c9af05..0000000
--- a/src/main/python/twitter/aurora/executor/thermos_runner.py
+++ /dev/null
@@ -1,138 +0,0 @@
-import functools
-import os
-import signal
-import sys
-import traceback
-
-from twitter.common import app, log
-from twitter.thermos.common.options import add_port_to
-from twitter.thermos.common.planner import TaskPlanner
-from twitter.thermos.config.loader import ThermosConfigLoader
-from twitter.thermos.core.runner import TaskRunner
-
-
-app.add_option(
-    "--thermos_json",
-    dest="thermos_json",
-    default=None,
-    help="read a thermos Task from a serialized json blob")
-
-
-app.add_option(
-    "--sandbox",
-    dest="sandbox",
-    metavar="PATH",
-    default=None,
-    help="the sandbox in which this task should run")
-
-
-app.add_option(
-     "--checkpoint_root",
-     dest="checkpoint_root",
-     metavar="PATH",
-     default=None,
-     help="the path where we will store checkpoints")
-
-
-app.add_option(
-     "--task_id",
-     dest="task_id",
-     metavar="STRING",
-     default=None,
-     help="The id to which this task should be bound, created if it does not exist.")
-
-
-app.add_option(
-     "--setuid",
-     dest="setuid",
-     metavar="USER",
-     default=None,
-     help="setuid tasks to this user, requires superuser privileges.")
-
-
-app.add_option(
-     "--enable_chroot",
-     dest="chroot",
-     default=False,
-     action='store_true',
-     help="chroot tasks to the sandbox before executing them.")
-
-
-app.add_option(
-     "--port",
-     type='string',
-     nargs=1,
-     action='callback',
-     callback=add_port_to('prebound_ports'),
-     dest='prebound_ports',
-     default={},
-     metavar = "NAME:PORT",
-     help="bind a numbered port PORT to name NAME")
-
-
-def get_task_from_options(opts):
-  tasks = ThermosConfigLoader.load_json(opts.thermos_json)
-  if len(tasks.tasks()) == 0:
-    app.error("No tasks specified!")
-  if len(tasks.tasks()) > 1:
-    app.error("Multiple tasks in config but no task name specified!")
-  task = tasks.tasks()[0]
-  if not task.task.check().ok():
-    app.error(task.task.check().message())
-  return task
-
-
-def runner_teardown(runner, sig=signal.SIGUSR1, frame=None):
-  """Destroy runner on SIGUSR1 (kill) or SIGUSR2 (lose)"""
-  op = 'kill' if sig == signal.SIGUSR1 else 'lose'
-  log.info('Thermos runner got signal %s, shutting down.' % sig)
-  log.info('Interrupted frame:')
-  if frame:
-    for line in ''.join(traceback.format_stack(frame)).splitlines():
-      log.info(line)
-  runner.close_ckpt()
-  log.info('Calling runner.%s()' % op)
-  getattr(runner, op)()
-  sys.exit(0)
-
-
-class CappedTaskPlanner(TaskPlanner):
-  TOTAL_RUN_LIMIT = 100
-
-
-def proxy_main(args, opts):
-  assert opts.thermos_json and os.path.exists(opts.thermos_json)
-  assert opts.sandbox
-  assert opts.checkpoint_root
-
-  thermos_task = get_task_from_options(opts)
-  prebound_ports = opts.prebound_ports
-  missing_ports = set(thermos_task.ports()) - set(prebound_ports)
-
-  if missing_ports:
-    app.error('ERROR!  Unbound ports: %s' % ' '.join(port for port in missing_ports))
-
-  task_runner = TaskRunner(
-      thermos_task.task,
-      opts.checkpoint_root,
-      opts.sandbox,
-      task_id=opts.task_id,
-      user=opts.setuid,
-      portmap=prebound_ports,
-      chroot=opts.chroot,
-      planner_class=CappedTaskPlanner
-  )
-
-  for sig in (signal.SIGUSR1, signal.SIGUSR2):
-    signal.signal(sig, functools.partial(runner_teardown, task_runner))
-
-  try:
-    task_runner.run()
-  except TaskRunner.InternalError as err:
-    app.error('Internal error: %s' % err)
-  except TaskRunner.InvalidTask as err:
-    app.error(str(err))
-  except TaskRunner.StateError:
-    app.error('Task appears to already be in a terminal state.')
-  except KeyboardInterrupt:
-    runner_teardown(task_runner)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/executor/thermos_task_runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/executor/thermos_task_runner.py b/src/main/python/twitter/aurora/executor/thermos_task_runner.py
deleted file mode 100644
index dba99d2..0000000
--- a/src/main/python/twitter/aurora/executor/thermos_task_runner.py
+++ /dev/null
@@ -1,331 +0,0 @@
-import errno
-import getpass
-import os
-import signal
-import subprocess
-import threading
-import time
-
-from twitter.aurora.common.http_signaler import HttpSignaler
-from twitter.common import log
-from twitter.common.dirutil import chmod_plus_x, safe_mkdtemp
-from twitter.common.log.options import LogOptions
-from twitter.common.quantity import Amount, Time
-from twitter.thermos.common.path import TaskPath
-from twitter.thermos.config.loader import ThermosTaskWrapper
-from twitter.thermos.core import runner as core
-from twitter.thermos.monitoring.monitor import TaskMonitor
-
-from gen.twitter.thermos.ttypes import TaskState
-
-from .common.status_checker import ExitState, StatusResult
-from .common.task_info import (
-    mesos_task_instance_from_assigned_task,
-    resolve_ports,
-)
-from .common.task_runner import (
-    TaskError,
-    TaskRunner,
-    TaskRunnerProvider,
-)
-
-
-class ThermosTaskRunner(TaskRunner):
-  ESCALATION_WAIT = Amount(5, Time.SECONDS)
-  EXIT_STATE_MAP = {
-      TaskState.ACTIVE: StatusResult('Runner died while task was active.', ExitState.LOST),
-      TaskState.FAILED: StatusResult('Task failed.', ExitState.FAILED),
-      TaskState.KILLED: StatusResult('Task killed.', ExitState.KILLED),
-      TaskState.LOST: StatusResult('Task lost.', ExitState.LOST),
-      TaskState.SUCCESS: StatusResult('Task finished.', ExitState.FINISHED),
-  }
-  MAX_WAIT = Amount(1, Time.MINUTES)
-  PEX_NAME = 'thermos_runner.pex'
-  POLL_INTERVAL = Amount(500, Time.MILLISECONDS)
-  THERMOS_PREEMPTION_WAIT = Amount(1, Time.MINUTES)
-
-  def __init__(self,
-               runner_pex,
-               task_id,
-               task,
-               role,
-               portmap,
-               sandbox,
-               checkpoint_root=None,
-               artifact_dir=None,
-               clock=time):
-    """
-      runner_pex       location of the thermos_runner pex that this task runner should use
-      task_id          task_id assigned by scheduler
-      task             thermos pystachio Task object
-      role             role to run the task under
-      portmap          { name => port } dictionary
-      sandbox          the sandbox object
-      checkpoint_root  the checkpoint root for the thermos runner
-      artifact_dir     scratch space for the thermos runner (basically cwd of thermos.pex)
-      clock            clock
-    """
-    self._runner_pex = runner_pex
-    self._task_id = task_id
-    self._task = task
-    self._popen = None
-    self._monitor = None
-    self._status = None
-    self._ports = portmap
-    self._root = sandbox.root
-    self._checkpoint_root = checkpoint_root or TaskPath.DEFAULT_CHECKPOINT_ROOT
-    self._enable_chroot = sandbox.chrooted
-    self._role = role
-    self._clock = clock
-    self._artifact_dir = artifact_dir or safe_mkdtemp()
-
-    # wait events
-    self._dead = threading.Event()
-    self._kill_signal = threading.Event()
-    self.forking = threading.Event()
-    self.forked = threading.Event()
-
-    try:
-      with open(os.path.join(self._artifact_dir, 'task.json'), 'w') as fp:
-        self._task_filename = fp.name
-        ThermosTaskWrapper(self._task).to_file(self._task_filename)
-    except ThermosTaskWrapper.InvalidTask as e:
-      raise TaskError('Failed to load task: %s' % e)
-
-  def _terminate_http(self):
-    if 'health' not in self._ports:
-      return
-
-    http_signaler = HttpSignaler(self._ports['health'])
-
-    # pass 1
-    http_signaler.quitquitquit()
-    self._clock.sleep(self.ESCALATION_WAIT.as_(Time.SECONDS))
-    if self.status is not None:
-      return True
-
-    # pass 2
-    http_signaler.abortabortabort()
-    self._clock.sleep(self.ESCALATION_WAIT.as_(Time.SECONDS))
-    if self.status is not None:
-      return True
-
-  @property
-  def artifact_dir(self):
-    return self._artifact_dir
-
-  def task_state(self):
-    return self._monitor.task_state() if self._monitor else None
-
-  @property
-  def is_alive(self):
-    """
-      Is the process underlying the Thermos task runner alive?
-    """
-    if not self._popen:
-      return False
-    if self._dead.is_set():
-      return False
-
-    # N.B. You cannot mix this code and any code that relies upon os.wait
-    # mechanisms with blanket child process collection.  One example is the
-    # Thermos task runner which calls os.wait4 -- without refactoring, you
-    # should not mix a Thermos task runner in the same process as this
-    # thread.
-    try:
-      pid, _ = os.waitpid(self._popen.pid, os.WNOHANG)
-      if pid == 0:
-        return True
-      else:
-        log.info('Detected runner termination: pid=%s' % pid)
-    except OSError as e:
-      log.error('is_alive got OSError: %s' % e)
-      if e.errno != errno.ECHILD:
-        raise
-
-    self._dead.set()
-    return False
-
-  def compute_status(self):
-    if self.is_alive:
-      return None
-    exit_state = self.EXIT_STATE_MAP.get(self.task_state())
-    if exit_state is None:
-      log.error('Received unexpected exit state from TaskMonitor.')
-    return exit_state
-
-  def terminate_runner(self, as_loss=False):
-    """
-      Terminate the underlying runner process, if it exists.
-    """
-    if self._kill_signal.is_set():
-      log.warning('Duplicate kill/lose signal received, ignoring.')
-      return
-    self._kill_signal.set()
-    if self.is_alive:
-      sig = 'SIGUSR2' if as_loss else 'SIGUSR1'
-      log.info('Runner is alive, sending %s' % sig)
-      try:
-        self._popen.send_signal(getattr(signal, sig))
-      except OSError as e:
-        log.error('Got OSError sending %s: %s' % (sig, e))
-    else:
-      log.info('Runner is dead, skipping kill.')
-
-  def kill(self):
-    self.terminate_runner()
-
-  def lose(self):
-    self.terminate_runner(as_loss=True)
-
-  def quitquitquit(self):
-    """Bind to the process tree of a Thermos task and kill it with impunity."""
-    try:
-      runner = core.TaskRunner.get(self._task_id, self._checkpoint_root)
-      if runner:
-        log.info('quitquitquit calling runner.kill')
-        # Right now preemption wait is hardcoded, though it may become configurable in the future.
-        runner.kill(force=True, preemption_wait=self.THERMOS_PREEMPTION_WAIT)
-      else:
-        log.error('Could not instantiate runner!')
-    except core.TaskRunner.Error as e:
-      log.error('Could not quitquitquit runner: %s' % e)
-
-  def _cmdline(self):
-    params = dict(log_dir=LogOptions.log_dir(),
-                  log_to_disk='DEBUG',
-                  checkpoint_root=self._checkpoint_root,
-                  sandbox=self._root,
-                  task_id=self._task_id,
-                  thermos_json=self._task_filename)
-
-    if getpass.getuser() == 'root':
-      params.update(setuid=self._role)
-
-    cmdline_args = [self._runner_pex]
-    cmdline_args.extend('--%s=%s' % (flag, value) for flag, value in params.items())
-    if self._enable_chroot:
-      cmdline_args.extend(['--enable_chroot'])
-    for name, port in self._ports.items():
-      cmdline_args.extend(['--port=%s:%s' % (name, port)])
-    return cmdline_args
-
-  # --- public interface
-  def start(self, timeout=MAX_WAIT):
-    """Fork the task runner and return once the underlying task is running, up to timeout."""
-    self.forking.set()
-
-    try:
-      chmod_plus_x(self._runner_pex)
-    except OSError as e:
-      if e.errno != errno.EPERM:
-        raise TaskError('Failed to chmod +x runner: %s' % e)
-
-    self._monitor = TaskMonitor(TaskPath(root=self._checkpoint_root), self._task_id)
-
-    cmdline_args = self._cmdline()
-    log.info('Forking off runner with cmdline: %s' % ' '.join(cmdline_args))
-
-    try:
-      self._popen = subprocess.Popen(cmdline_args)
-    except OSError as e:
-      raise TaskError(e)
-
-    self.forked.set()
-
-    log.debug('Waiting for task to start.')
-
-    def is_started():
-      return self._monitor and (self._monitor.active or self._monitor.finished)
-
-    waited = Amount(0, Time.SECONDS)
-    while not is_started() and waited < timeout:
-      log.debug('  - sleeping...')
-      self._clock.sleep(self.POLL_INTERVAL.as_(Time.SECONDS))
-      waited += self.POLL_INTERVAL
-
-    if not is_started():
-      log.error('Task did not start with in deadline, forcing loss.')
-      self.lose()
-      raise TaskError('Task did not start within deadline.')
-
-  def stop(self, timeout=MAX_WAIT):
-    """Stop the runner.  If it's already completed, no-op.  If it's still running, issue a kill."""
-    log.info('ThermosTaskRunner is shutting down.')
-
-    if not self.forking.is_set():
-      raise TaskError('Failed to call TaskRunner.start.')
-
-    log.info('Invoking runner HTTP teardown.')
-    self._terminate_http()
-
-    log.info('Invoking runner.kill')
-    self.kill()
-
-    waited = Amount(0, Time.SECONDS)
-    while self.is_alive and waited < timeout:
-      self._clock.sleep(self.POLL_INTERVAL.as_(Time.SECONDS))
-      waited += self.POLL_INTERVAL
-
-    if not self.is_alive and self.task_state() != TaskState.ACTIVE:
-      return
-
-    log.info('Thermos task did not shut down cleanly, rebinding to kill.')
-    self.quitquitquit()
-
-    while not self._monitor.finished and waited < timeout:
-      self._clock.sleep(self.POLL_INTERVAL.as_(Time.SECONDS))
-      waited += self.POLL_INTERVAL
-
-    if not self._monitor.finished:
-      raise TaskError('Task did not stop within deadline.')
-
-  @property
-  def status(self):
-    """Return the StatusResult of this task runner.  This returns None as
-       long as no terminal state is reached."""
-    if self._status is None:
-      self._status = self.compute_status()
-    return self._status
-
-
-class DefaultThermosTaskRunnerProvider(TaskRunnerProvider):
-  def __init__(self,
-               pex_location,
-               checkpoint_root=None,
-               artifact_dir=None,
-               task_runner_class=ThermosTaskRunner,
-               max_wait=Amount(1, Time.MINUTES),
-               preemption_wait=Amount(1, Time.MINUTES),
-               poll_interval=Amount(500, Time.MILLISECONDS),
-               clock=time):
-    self._artifact_dir = artifact_dir or safe_mkdtemp()
-    self._checkpoint_root = checkpoint_root
-    self._clock = clock
-    self._max_wait = max_wait
-    self._pex_location = pex_location
-    self._poll_interval = poll_interval
-    self._preemption_wait = preemption_wait
-    self._task_runner_class = task_runner_class
-
-  def from_assigned_task(self, assigned_task, sandbox):
-    task_id = assigned_task.taskId
-    role = assigned_task.task.owner.role
-    mesos_task = mesos_task_instance_from_assigned_task(assigned_task)
-    mesos_ports = resolve_ports(mesos_task, assigned_task.assignedPorts)
-
-    class ProvidedThermosTaskRunner(self._task_runner_class):
-      MAX_WAIT = self._max_wait
-      POLL_INTERVAL = self._poll_interval
-      THERMOS_PREEMPTION_WAIT = self._preemption_wait
-
-    return ProvidedThermosTaskRunner(
-        self._pex_location,
-        task_id,
-        mesos_task.task(),
-        role,
-        mesos_ports,
-        sandbox,
-        checkpoint_root=self._checkpoint_root,
-        artifact_dir=self._artifact_dir,
-        clock=self._clock)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/tools/java/organize_imports.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/tools/java/organize_imports.py b/src/main/python/twitter/aurora/tools/java/organize_imports.py
deleted file mode 100644
index 48a0821..0000000
--- a/src/main/python/twitter/aurora/tools/java/organize_imports.py
+++ /dev/null
@@ -1,130 +0,0 @@
-#!/bin/env python
-'''
-Organizes a java source file's import statements in a way that pleases Twitter's checkstyle
-configuration.
-This expects exactly one argument: the name of the file to modify with preferred import
-ordering.
-'''
-
-from __future__ import print_function
-
-import re
-import sys
-from collections import defaultdict
-
-
-IMPORT_RE = re.compile('import(?: static)? (.*);')
-def get_group(import_statement):
-  matcher = IMPORT_RE.match(import_statement)
-  assert matcher, 'Could not parse import statement: %s' % import_statement
-  class_name = matcher.group(1)
-  if class_name.startswith('com.twitter'):
-    group = 'com.twitter'
-  else:
-    group = class_name.split('.')[0]
-  return group
-
-
-def index_by_group(import_statements):
-  groups = defaultdict(list)
-  for line in import_statements:
-    groups[get_group(line)].append(line)
-  return groups
-
-
-IMPORT_CLASS_RE = re.compile(
-    'import(?: static)? (?P<outer>[^A-Z]*[A-Z]\w+)(?:\.(?P<inners>[\w][^;]*))?')
-def get_all_group_lines(import_groups):
-  if not import_groups:
-    return []
-
-  def get_group_lines(group):
-    def comparator(x, y):
-      # These shenanigans are used to properly order imports for inner classes.
-      # So we get ordering like:
-      # import com.foo.Bar;
-      # import com.foo.Bar.Baz;
-      # (this is not lexicographical, so normal sort won't suffice)
-      x_m = IMPORT_CLASS_RE.match(x)
-      y_m = IMPORT_CLASS_RE.match(y)
-      if x_m.group('outer') == y_m.group('outer'):
-        return cmp(x_m.group('inners'), y_m.group('inners'))
-      else:
-        return cmp(x, y)
-    lines = sorted(import_groups[group], comparator)
-    lines.append('')
-    return lines 
-
-  all_lines = []
-  explicit_groups = ['java', 'javax', 'scala', 'com', 'net', 'org', 'com.twitter']
-  for group in explicit_groups:
-    if group in import_groups:
-      all_lines += get_group_lines(group)
-
-  # Gather remaining groups.
-  remaining_groups = sorted(set(import_groups.keys()) - set(explicit_groups))
-  for group in remaining_groups:
-    all_lines += get_group_lines(group)
-  return all_lines
-
-
-if len(sys.argv) != 2:
-  print('usage: %s FILE' % sys.argv[0])
-  sys.exit(1)
-
-BEFORE_IMPORTS = 'before_imports'
-IMPORTS = 'imports'
-STATIC_IMPORTS = 'static_imports'
-AFTER_IMPORTS = 'after_imports'
-
-print('Organizing imports in %s' % sys.argv[1])
-lines_before_imports = []
-import_lines = []
-static_import_lines = []
-lines_after_imports = []
-with open(sys.argv[1], 'r') as f:
-  position = BEFORE_IMPORTS
-  for line in f:
-    line = line.rstrip()
-    if position == BEFORE_IMPORTS:
-      if line.startswith('import'):
-        position = IMPORTS
-      else:
-        lines_before_imports.append(line)
-    if position == IMPORTS:
-      if line.startswith('import static'):
-        position = STATIC_IMPORTS
-      elif line.startswith('import'):
-        import_lines.append(line)
-      elif line.strip():
-        position = AFTER_IMPORTS
-    if position == STATIC_IMPORTS:
-      if line.startswith('import static'):
-        static_import_lines.append(line)
-      elif line.strip():
-        position = AFTER_IMPORTS
-    if position == AFTER_IMPORTS:
-      lines_after_imports.append(line)
-
-import_groups = index_by_group(import_lines)
-static_import_groups = index_by_group(static_import_lines)
-
-def ensure_line_padding(lines):
-  if lines and lines[-1] != '':
-    lines.append('')
-  return lines
-
-file_lines = lines_before_imports
-if import_groups:
-  ensure_line_padding(file_lines)
-  file_lines += get_all_group_lines(import_groups)
-if static_import_groups:
-  ensure_line_padding(file_lines)
-  file_lines += get_all_group_lines(static_import_groups)
-if lines_after_imports:
-  ensure_line_padding(file_lines)
-  file_lines += lines_after_imports
-
-with open(sys.argv[1], 'w') as f:
-  for line in file_lines:
-    print(line, file=f)


Mime
View raw message