aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject [2/2] aurora git commit: Removing GC executor code.
Date Thu, 25 Jun 2015 20:49:36 GMT
Removing GC executor code.

Bugs closed: AURORA-1333

Reviewed at https://reviews.apache.org/r/35813/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/56bb1e69
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/56bb1e69
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/56bb1e69

Branch: refs/heads/master
Commit: 56bb1e693db4312892f425ab56108ac3e6e00086
Parents: d977aa4
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Thu Jun 25 13:49:15 2015 -0700
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Thu Jun 25 13:49:15 2015 -0700

----------------------------------------------------------------------
 NEWS                                            |   1 +
 api/src/main/thrift/org/apache/aurora/gen/BUILD |   1 -
 .../org/apache/aurora/gen/internal_rpc.thrift   |  28 -
 debian/aurora-executor.install                  |   1 -
 debian/rules                                    |   1 -
 docs/test-resource-generation.md                |   5 +-
 examples/vagrant/aurorabuild.sh                 |   1 -
 src/main/python/apache/aurora/executor/BUILD    |  31 -
 .../python/apache/aurora/executor/bin/BUILD     |  29 -
 .../aurora/executor/bin/gc_executor_main.py     |  77 ---
 .../apache/aurora/executor/gc_executor.py       | 574 ----------------
 .../python/apache/thermos/cli/commands/BUILD    |  12 -
 .../python/apache/thermos/cli/commands/gc.py    | 105 ---
 src/main/python/apache/thermos/cli/main.py      |   2 -
 src/main/python/apache/thermos/core/BUILD       |  13 -
 src/main/python/apache/thermos/core/helper.py   |  18 -
 .../python/apache/thermos/core/inspector.py     | 115 ----
 src/main/python/apache/thermos/monitoring/BUILD |  14 -
 .../python/apache/thermos/monitoring/garbage.py | 198 ------
 src/test/python/apache/aurora/executor/BUILD    |  19 -
 .../python/apache/aurora/executor/bin/BUILD     |  10 -
 .../bin/test_gc_executor_entry_point.py         |  40 --
 .../apache/aurora/executor/test_gc_executor.py  | 656 -------------------
 .../apache/thermos/cli/commands/test_import.py  |   2 -
 src/test/python/apache/thermos/monitoring/BUILD |  14 -
 .../apache/thermos/monitoring/test_garbage.py   |  90 ---
 26 files changed, 3 insertions(+), 2054 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/NEWS
----------------------------------------------------------------------
diff --git a/NEWS b/NEWS
index a17f0e7..1a0fb48 100644
--- a/NEWS
+++ b/NEWS
@@ -2,3 +2,4 @@
 -----
 
 - Now requires JRE 8 or greater.
+- GC executor is fully replaced by the task state reconciliation (AURORA-1047).

http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/api/src/main/thrift/org/apache/aurora/gen/BUILD
----------------------------------------------------------------------
diff --git a/api/src/main/thrift/org/apache/aurora/gen/BUILD b/api/src/main/thrift/org/apache/aurora/gen/BUILD
index fe3f83b..d196fef 100644
--- a/api/src/main/thrift/org/apache/aurora/gen/BUILD
+++ b/api/src/main/thrift/org/apache/aurora/gen/BUILD
@@ -20,7 +20,6 @@ python_thrift_library(
   name = 'py-thrift',
   sources = [
     'api.thrift',
-    'internal_rpc.thrift',
   ],
 )
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/api/src/main/thrift/org/apache/aurora/gen/internal_rpc.thrift
----------------------------------------------------------------------
diff --git a/api/src/main/thrift/org/apache/aurora/gen/internal_rpc.thrift b/api/src/main/thrift/org/apache/aurora/gen/internal_rpc.thrift
deleted file mode 100644
index a2c230f..0000000
--- a/api/src/main/thrift/org/apache/aurora/gen/internal_rpc.thrift
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace java org.apache.aurora.gen.comm
-namespace py gen.apache.aurora.comm
-
-include "api.thrift"
-
-// Thrift interface to define the communication between the scheduler and executor.
-
-// Message sent from the scheduler to the executor, indicating that some
-// task history associated with the host may have been purged, and the
-// executor should only retain tasks associated with the provided tasks IDs.
-struct AdjustRetainedTasks {
-  2: map<string, api.ScheduleStatus> retainedTasks  // All tasks that the executor should
-                                                    // retain, and their statuses.
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/debian/aurora-executor.install
----------------------------------------------------------------------
diff --git a/debian/aurora-executor.install b/debian/aurora-executor.install
index 8efb130..5d0d1f7 100644
--- a/debian/aurora-executor.install
+++ b/debian/aurora-executor.install
@@ -1,4 +1,3 @@
-dist/gc_executor.pex /usr/share/aurora/bin
 dist/thermos_executor.pex /usr/share/aurora/bin
 dist/thermos_observer.pex /usr/share/aurora/bin
 dist/thermos.pex /usr/share/aurora/bin

http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/debian/rules
----------------------------------------------------------------------
diff --git a/debian/rules b/debian/rules
index 6ba18ce..db0f14e 100755
--- a/debian/rules
+++ b/debian/rules
@@ -33,7 +33,6 @@ __pants_build:
 	mkdir -p third_party
 	$(pants) binary src/main/python/apache/aurora/admin:kaurora_admin
 	$(pants) binary src/main/python/apache/aurora/client/cli:kaurora
-	$(pants) binary src/main/python/apache/aurora/executor/bin:gc_executor
 	$(pants) binary src/main/python/apache/aurora/executor/bin:thermos_executor
 	$(pants) binary src/main/python/apache/thermos/cli/bin:thermos
 	$(pants) binary src/main/python/apache/thermos/bin:thermos_runner

http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/docs/test-resource-generation.md
----------------------------------------------------------------------
diff --git a/docs/test-resource-generation.md b/docs/test-resource-generation.md
index 335586d..e78e742 100644
--- a/docs/test-resource-generation.md
+++ b/docs/test-resource-generation.md
@@ -4,9 +4,8 @@
 The Aurora source repository and distributions contain several
 [binary files](../src/test/resources/org/apache/thermos/root/checkpoints) to
 qualify the backwards-compatibility of thermos with checkpoint data. Since
-thermos persists state to disk, to be read by other components (the GC executor
-and the thermos observer), it is important that we have tests that prevent
-regressions affecting the ability to parse previously-written data.
+thermos persists state to disk, to be read by the thermos observer), it is important that we have
+tests that prevent regressions affecting the ability to parse previously-written data.
 
 ## Generating test files
 The files included represent persisted checkpoints that exercise different

http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/examples/vagrant/aurorabuild.sh
----------------------------------------------------------------------
diff --git a/examples/vagrant/aurorabuild.sh b/examples/vagrant/aurorabuild.sh
index 5eb171c..fbaa6ae 100755
--- a/examples/vagrant/aurorabuild.sh
+++ b/examples/vagrant/aurorabuild.sh
@@ -58,7 +58,6 @@ function build_scheduler {
 }
 
 function build_executor {
-  ./pants binary src/main/python/apache/aurora/executor/bin:gc_executor
   ./pants binary src/main/python/apache/aurora/executor/bin:thermos_executor
   ./pants binary src/main/python/apache/thermos/bin:thermos_runner
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/main/python/apache/aurora/executor/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/BUILD b/src/main/python/apache/aurora/executor/BUILD
index cbb2f5f..52be02b 100644
--- a/src/main/python/apache/aurora/executor/BUILD
+++ b/src/main/python/apache/aurora/executor/BUILD
@@ -95,36 +95,6 @@ python_library(
   ]
 )
 
-
-python_library(
-  name = 'gc_executor',
-  sources = ['gc_executor.py'],
-  dependencies = [
-    ':executor_base',
-    '3rdparty/python:mesos.interface',
-    # To prevent an alpha version of protobuf from being pulled down by pants we
-    # specify protobuf here. See AURORA-1128 for more details.
-    '3rdparty/python:protobuf',
-    '3rdparty/python:psutil',
-    '3rdparty/python:twitter.common.collections',
-    '3rdparty/python:twitter.common.exceptions',
-    '3rdparty/python:twitter.common.log',
-    '3rdparty/python:twitter.common.metrics',
-    '3rdparty/python:twitter.common.quantity',
-    'src/main/python/apache/thermos/common:ckpt',
-    'src/main/python/apache/thermos/common:path',
-    'src/main/python/apache/thermos/core:helper',
-    'src/main/python/apache/thermos/core:inspector',
-    'src/main/python/apache/thermos/monitoring:detector',
-    'src/main/python/apache/thermos/monitoring:garbage',
-    'src/main/python/apache/aurora/config:schema',
-    'src/main/python/apache/aurora/executor/common:executor_detector',
-    'src/main/python/apache/aurora/executor/common:sandbox',
-    'api/src/main/thrift/org/apache/aurora/gen:py-thrift',
-    'api/src/main/thrift/org/apache/thermos:py-thrift',
-  ]
-)
-
 python_library(
   name = 'executor-packaged',
   dependencies = [
@@ -140,7 +110,6 @@ python_library(
     name = 'apache.aurora.executor',
     version = open(os.path.join(get_buildroot(), '.auroraversion')).read().strip().upper(),
   ).with_binaries(
-    gc_executor = 'src/main/python/apache/aurora/executor/bin:gc_executor',
     thermos_executor = 'src/main/python/apache/aurora/executor/bin:thermos_executor',
   )
 )

http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/main/python/apache/aurora/executor/bin/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/bin/BUILD b/src/main/python/apache/aurora/executor/bin/BUILD
index 0fbb0f8..b23429a 100644
--- a/src/main/python/apache/aurora/executor/bin/BUILD
+++ b/src/main/python/apache/aurora/executor/bin/BUILD
@@ -45,32 +45,3 @@ python_binary(
     ':thermos_executor_source',
   ]
 )
-
-python_library(
-  name = 'gc_executor_source',
-  sources = ['gc_executor_main.py'],
-  dependencies = [
-    # To prevent an alpha version of protobuf from being pulled down by pants we
-    # specify protobuf here. See AURORA-1128 for more details.
-    '3rdparty/python:protobuf',
-    '3rdparty/python:twitter.common.app',
-    '3rdparty/python:twitter.common.log',
-    '3rdparty/python:twitter.common.metrics',
-    'src/main/python/apache/thermos/common:constants',
-    'src/main/python/apache/thermos/monitoring:detector',
-    'src/main/python/apache/aurora/executor/common:executor_detector',
-    'src/main/python/apache/aurora/executor/common:path_detector',
-    'src/main/python/apache/aurora/executor:executor_vars',
-    'src/main/python/apache/aurora/executor:gc_executor',
-  ]
-)
-
-python_binary(
-  name = 'gc_executor',
-  entry_point = 'apache.aurora.executor.bin.gc_executor_main:proxy_main',
-  always_write_cache = True,
-  dependencies = [
-    '3rdparty/python:mesos.native',
-    ':gc_executor_source',
-  ]
-)

http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/main/python/apache/aurora/executor/bin/gc_executor_main.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/bin/gc_executor_main.py b/src/main/python/apache/aurora/executor/bin/gc_executor_main.py
deleted file mode 100644
index 8093717..0000000
--- a/src/main/python/apache/aurora/executor/bin/gc_executor_main.py
+++ /dev/null
@@ -1,77 +0,0 @@
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Command-line entry point to the Thermos GC executor
-
-This module wraps the Thermos GC executor into an executable suitable for launching by a Mesos
-slave.
-
-"""
-
-try:
-  from mesos.native import MesosExecutorDriver
-except ImportError:
-  MesosExecutorDriver = None
-
-from twitter.common import app, log
-from twitter.common.log.options import LogOptions
-from twitter.common.metrics.sampler import DiskMetricWriter
-
-from apache.aurora.executor.common.executor_detector import ExecutorDetector
-from apache.aurora.executor.common.path_detector import MesosPathDetector
-from apache.aurora.executor.gc_executor import ThermosGCExecutor
-from apache.thermos.common.constants import DEFAULT_CHECKPOINT_ROOT
-from apache.thermos.monitoring.detector import ChainedPathDetector, FixedPathDetector
-
-app.configure(debug=True)
-
-
-# locate logs locally in executor sandbox
-LogOptions.set_simple(True)
-LogOptions.set_disk_log_level('DEBUG')
-LogOptions.set_log_dir(ExecutorDetector.LOG_PATH)
-
-
-def initialize():
-  path_detector = ChainedPathDetector(
-      FixedPathDetector(DEFAULT_CHECKPOINT_ROOT),
-      MesosPathDetector(),
-  )
-
-  # Create executor stub
-  thermos_gc_executor = ThermosGCExecutor(path_detector)
-
-  # Create metrics collector
-  metric_writer = DiskMetricWriter(thermos_gc_executor.metrics, ExecutorDetector.VARS_PATH)
-
-  # Create driver stub
-  driver = MesosExecutorDriver(thermos_gc_executor)
-
-  return thermos_gc_executor, metric_writer, driver
-
-
-def proxy_main():
-  def main():
-    if MesosExecutorDriver is None:
-      app.error('Could not load MesosExecutorDriver!')
-
-    thermos_gc_executor, metric_writer, driver = initialize()
-
-    thermos_gc_executor.start()
-    metric_writer.start()
-    driver.run()
-
-    log.info('MesosExecutorDriver.run() has finished.')
-
-  app.main()

http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/main/python/apache/aurora/executor/gc_executor.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/gc_executor.py b/src/main/python/apache/aurora/executor/gc_executor.py
deleted file mode 100644
index d4392fa..0000000
--- a/src/main/python/apache/aurora/executor/gc_executor.py
+++ /dev/null
@@ -1,574 +0,0 @@
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Thermos garbage-collection (GC) executor
-
-This module containts the Thermos GC executor, responsible for garbage collecting old tasks and
-reconciling task states with the Mesos scheduler. It is intended to be run periodically on Mesos
-slaves utilising the Thermos executor.
-
-"""
-
-import os
-import threading
-import time
-from collections import namedtuple
-
-import psutil
-from mesos.interface import mesos_pb2
-from thrift.TSerialization import deserialize as thrift_deserialize
-from twitter.common.collections import OrderedDict
-from twitter.common.exceptions import ExceptionalThread
-from twitter.common.metrics import Observable
-from twitter.common.metrics.gauge import AtomicGauge
-from twitter.common.quantity import Amount, Time
-
-from apache.thermos.common.ckpt import CheckpointDispatcher
-from apache.thermos.common.path import TaskPath
-from apache.thermos.core.helper import TaskKiller
-from apache.thermos.core.inspector import CheckpointInspector
-from apache.thermos.monitoring.detector import PathDetector, TaskDetector
-from apache.thermos.monitoring.garbage import TaskGarbageCollector
-
-from .common.executor_detector import ExecutorDetector
-from .common.sandbox import DirectorySandbox
-from .executor_base import ExecutorBase
-
-from gen.apache.aurora.api.constants import TERMINAL_STATES
-from gen.apache.aurora.api.ttypes import ScheduleStatus
-from gen.apache.aurora.comm.ttypes import AdjustRetainedTasks
-from gen.apache.thermos.ttypes import TaskState
-
-THERMOS_TO_TWITTER_STATES = {
-    TaskState.ACTIVE: ScheduleStatus.RUNNING,
-    TaskState.CLEANING: ScheduleStatus.RUNNING,
-    TaskState.FINALIZING: ScheduleStatus.RUNNING,
-    TaskState.SUCCESS: ScheduleStatus.FINISHED,
-    TaskState.FAILED: ScheduleStatus.FAILED,
-    TaskState.KILLED: ScheduleStatus.KILLED,
-    TaskState.LOST: ScheduleStatus.LOST,
-}
-
-
-THERMOS_TO_MESOS_STATES = {
-    TaskState.ACTIVE: mesos_pb2.TASK_RUNNING,
-    TaskState.SUCCESS: mesos_pb2.TASK_FINISHED,
-    TaskState.FAILED: mesos_pb2.TASK_FAILED,
-    TaskState.KILLED: mesos_pb2.TASK_KILLED,
-    TaskState.LOST: mesos_pb2.TASK_LOST,
-}
-
-
-# RootedTask is a (checkpoint_root, task_id) tuple.  Before, checkpoint_root was assumed to be a
-# globally defined location e.g. '/var/run/thermos'.  We are trying to move checkpoints into
-# sandboxes, which mean that each task will have its own checkpoint_root, so we can no longer rely
-# upon a single invariant checkpoint root passed into the GC executor constructor.
-RootedTask = namedtuple('RootedTask', 'checkpoint_root task_id')
-
-
-class ThermosGCExecutor(ExecutorBase, ExceptionalThread, Observable):
-  """
-    Thermos GC Executor, responsible for:
-      - garbage collecting old tasks to make sure they don't clutter up the system
-      - state reconciliation with the scheduler (in case it thinks we're running
-        something we're not or vice versa.)
-  """
-  MAX_PID_TIME_DRIFT = Amount(10, Time.SECONDS)
-  MAX_CHECKPOINT_TIME_DRIFT = Amount(1, Time.HOURS)  # maximum runner disconnection time
-
-  # how old a task must be before we're willing to kill it, assuming that there could be
-  # slight races in the following scenario:
-  #    launch gc with retained_tasks={t1, t2, t3}
-  #    launch task t4
-  MINIMUM_KILL_AGE = Amount(10, Time.MINUTES)
-
-  # wait time between checking for new GC events from the slave and/or cleaning orphaned tasks
-  POLL_WAIT = Amount(5, Time.MINUTES)
-
-  # maximum lifetime of this executor.  this is to prevent older GC executor binaries from
-  # running forever
-  MAXIMUM_EXECUTOR_LIFETIME = Amount(1, Time.DAYS)
-
-  PERSISTENCE_WAIT = Amount(5, Time.SECONDS)
-
-  def __init__(self,
-               path_detector,
-               verbose=True,
-               task_killer=TaskKiller,
-               executor_detector=ExecutorDetector,
-               task_garbage_collector=TaskGarbageCollector,
-               clock=time):
-    ExecutorBase.__init__(self)
-    ExceptionalThread.__init__(self)
-    self.daemon = True
-    self._stop_event = threading.Event()
-    # mapping of task_id => (TaskInfo, AdjustRetainedTasks), in the order in
-    # which they were received via a launchTask.
-    self._gc_task_queue = OrderedDict()
-    # cache the ExecutorDriver provided by the slave, so we can use it out
-    # of band from slave-initiated callbacks.  This should be supplied by
-    # ExecutorBase.registered() when the executor first registers with the
-    # slave.
-    self._driver = None
-    self._slave_id = None  # cache the slave ID provided by the slave
-    self._task_id = None  # the task_id currently being executed by the ThermosGCExecutor, if any
-    self._start_time = None  # the start time of a task currently being executed, if any
-    self._executor_detector = executor_detector()
-    self._collector_class = task_garbage_collector
-    self._clock = clock
-    self._task_killer = task_killer
-    if not isinstance(path_detector, PathDetector):
-      raise TypeError('ThermosGCExecutor expects a path_detector of type PathDetector, got %s' %
-          type(path_detector))
-    self._path_detector = path_detector
-    self._dropped_tasks = AtomicGauge('dropped_tasks')
-    self.metrics.register(self._dropped_tasks)
-
-  def _runner_ckpt(self, task):
-    """Return the runner checkpoint file for a given task.
-
-    :param task: An instance of a task to retrieve checkpoint path
-    :type task: :class:`RootedTask` instance
-    """
-    return TaskPath(root=task.checkpoint_root, task_id=task.task_id).getpath('runner_checkpoint')
-
-  def _terminate_task(self, task, kill=True):
-    """Terminate a task using the associated task killer. Returns a boolean indicating success."""
-    killer = self._task_killer(task.task_id, task.checkpoint_root)
-    self.log('Terminating %s...' % task.task_id)
-    runner_terminate = killer.kill if kill else killer.lose
-    try:
-      runner_terminate(force=True)
-      return True
-    except Exception as e:
-      self.log('Could not terminate: %s' % e)
-      return False
-
-  def partition_tasks(self):
-    """Return active/finished tasks as discovered from the checkpoint roots."""
-    active_tasks, finished_tasks = set(), set()
-
-    for checkpoint_root in self._path_detector.get_paths():
-      detector = TaskDetector(root=checkpoint_root)
-
-      active_tasks.update(RootedTask(checkpoint_root, task_id)
-          for _, task_id in detector.get_task_ids(state='active'))
-      finished_tasks.update(RootedTask(checkpoint_root, task_id)
-          for _, task_id in detector.get_task_ids(state='finished'))
-
-    return active_tasks, finished_tasks
-
-  def get_states(self, task):
-    """Returns the (timestamp, status) tuples of the task or [] if could not replay."""
-    statuses = CheckpointDispatcher.iter_statuses(self._runner_ckpt(task))
-    try:
-      return [(state.timestamp_ms / 1000.0, state.state) for state in statuses]
-    except CheckpointDispatcher.ErrorRecoveringState:
-      return []
-
-  def get_sandbox(self, task):
-    """Returns the sandbox of the task, or None if it has not yet been initialized."""
-    try:
-      for update in CheckpointDispatcher.iter_updates(self._runner_ckpt(task)):
-        if update.runner_header and update.runner_header.sandbox:
-          return update.runner_header.sandbox
-    except CheckpointDispatcher.ErrorRecoveringState:
-      return None
-
-  def maybe_terminate_unknown_task(self, task):
-    """Terminate a task if we believe the scheduler doesn't know about it.
-
-       It's possible for the scheduler to queue a GC and launch a task afterwards, in which
-       case we may see actively running tasks that the scheduler did not report in the
-       AdjustRetainedTasks message.
-
-       Returns:
-         boolean indicating whether the task was terminated
-    """
-    states = self.get_states(task)
-    if states:
-      task_start_time, _ = states[0]
-      if self._start_time - task_start_time > self.MINIMUM_KILL_AGE.as_(Time.SECONDS):
-        return self._terminate_task(task)
-    return False
-
-  def should_gc_task(self, task):
-    """Check if a possibly-corrupt task should be locally GCed
-
-      A task should be GCed if its checkpoint stream appears to be corrupted and the kill age
-      threshold is exceeded.
-
-       Returns:
-         set, containing the task if it should be marked for local GC, or empty otherwise
-    """
-    runner_ckpt = self._runner_ckpt(task)
-    if not os.path.exists(runner_ckpt):
-      return set()
-    latest_update = os.path.getmtime(runner_ckpt)
-    if self._start_time - latest_update > self.MINIMUM_KILL_AGE.as_(Time.SECONDS):
-      self.log('Got corrupt checkpoint file for %s - marking for local GC' % task.task_id)
-      return set([task])
-    else:
-      self.log('Checkpoint file unreadable, but not yet beyond MINIMUM_KILL_AGE threshold')
-      return set()
-
-  def reconcile_states(self, driver, retained_tasks):
-    """Reconcile states that the scheduler thinks tasks are in vs what they really are in.
-
-        Local    vs   Scheduler  => Action
-       ===================================
-        ACTIVE         ACTIVE    => no-op
-        ACTIVE        ASSIGNED   => no-op
-        ACTIVE        TERMINAL   => maybe kill task*
-        ACTIVE        !EXISTS    => maybe kill task*
-       TERMINAL        ACTIVE    => send actual status**
-       TERMINAL       ASSIGNED   => send actual status**
-       TERMINAL       TERMINAL   => no-op
-       TERMINAL       !EXISTS    => gc locally
-       !EXISTS         ACTIVE    => send LOST**
-       !EXISTS        ASSIGNED   => no-op
-       !EXISTS        TERMINAL   => gc remotely
-
-       * - Only kill if this does not appear to be a race condition.
-       ** - These appear to have no effect
-
-       Side effecting operations:
-         ACTIVE   | (TERMINAL / !EXISTS) => maybe kill
-         TERMINAL | !EXISTS              => delete
-         !EXISTS  | TERMINAL             => delete
-
-      Returns tuple of (local_gc, remote_gc, updates), where:
-        local_gc - set of RootedTasks to be GCed locally
-        remote_gc - set of task_ids to be deleted on the scheduler
-        updates - dictionary of updates sent to the scheduler (task_id: ScheduleStatus)
-    """
-    def partition(rt):
-      active, assigned, finished = set(), set(), set()
-      for task, schedule_status in rt.items():
-        if schedule_status in TERMINAL_STATES:
-          finished.add(task)
-        elif schedule_status == ScheduleStatus.ASSIGNED:
-          assigned.add(task)
-        else:
-          active.add(task)
-      return active, assigned, finished
-
-    local_active, local_finished = self.partition_tasks()
-    sched_active, sched_assigned, sched_finished = partition(retained_tasks)
-    local_tasks = local_active | local_finished
-    sched_tasks = sched_active | sched_assigned | sched_finished
-
-    self.log('Told to retain the following task ids:')
-    for task_id, schedule_status in retained_tasks.items():
-      self.log('  => %s as %s' %
-          (task_id, ScheduleStatus._VALUES_TO_NAMES.get(schedule_status, 'UNKNOWN')))
-
-    self.log('Local active tasks:')
-    for task in local_active:
-      self.log('  => %s' % task.task_id)
-
-    self.log('Local finished tasks:')
-    for task in local_finished:
-      self.log('  => %s' % task.task_id)
-
-    local_gc, remote_gc_ids = set(), set()
-    updates = {}
-
-    for task in local_active:
-      if task.task_id not in (sched_active | sched_assigned):
-        self.log('Inspecting task %s for termination.' % task.task_id)
-        if not self.maybe_terminate_unknown_task(task):
-          local_gc.update(self.should_gc_task(task))
-
-    for task in local_finished:
-      if task.task_id not in sched_tasks:
-        self.log('Queueing task %s for local deletion.' % task.task_id)
-        local_gc.add(task)
-      if task.task_id in (sched_active | sched_assigned):
-        self.log('Task %s finished but scheduler thinks active/assigned.' % task.task_id)
-        states = self.get_states(task)
-        if states:
-          _, last_state = states[-1]
-          updates[task.task_id] = THERMOS_TO_TWITTER_STATES.get(last_state, ScheduleStatus.LOST)
-          self.send_update(
-              driver,
-              task.task_id,
-              THERMOS_TO_MESOS_STATES.get(last_state, mesos_pb2.TASK_LOST),
-              'Task finish detected by GC executor.')
-        else:
-          local_gc.update(self.should_gc_task(task))
-
-    local_task_ids = set(task.task_id for task in local_tasks)
-
-    for task_id in sched_finished:
-      if task_id not in local_task_ids:
-        self.log('Queueing task %s for remote deletion.' % task_id)
-        remote_gc_ids.add(task_id)
-
-    for task_id in sched_active:
-      if task_id not in local_task_ids:
-        self.log('Know nothing about task %s, telling scheduler of LOSS.' % task_id)
-        updates[task_id] = ScheduleStatus.LOST
-        self.send_update(
-            driver, task_id, mesos_pb2.TASK_LOST, 'GC executor found no trace of task.')
-
-    for task_id in sched_assigned:
-      if task_id not in local_task_ids:
-        self.log('Know nothing about task %s, but scheduler says ASSIGNED - passing' % task_id)
-
-    return local_gc, remote_gc_ids, updates
-
-  def clean_orphans(self, driver):
-    """Inspect checkpoints for trees that have been kill -9'ed but not properly cleaned up."""
-    self.log('Checking for orphaned tasks')
-    active_tasks, _ = self.partition_tasks()
-    updates = {}
-
-    def is_our_process(process, uid, timestamp):
-      if process.uids().real != uid:
-        return False
-      estimated_start_time = self._clock.time() - process.create_time()
-      return abs(timestamp - estimated_start_time) < self.MAX_PID_TIME_DRIFT.as_(Time.SECONDS)
-
-    for task in active_tasks:
-      inspector = CheckpointInspector(task.checkpoint_root)
-
-      self.log('Inspecting running task: %s' % task.task_id)
-      inspection = inspector.inspect(task.task_id)
-      if not inspection:
-        self.log('  - Error inspecting task runner')
-        continue
-      latest_runner = inspection.runner_processes[-1]
-      # Assume that it has not yet started?
-      if not latest_runner:
-        self.log('  - Task has no registered runners.')
-        continue
-      runner_pid, runner_uid, timestamp_ms = latest_runner
-      try:
-        runner_process = psutil.Process(runner_pid)
-        if is_our_process(runner_process, runner_uid, timestamp_ms / 1000.0):
-          self.log('  - Runner appears healthy.')
-          continue
-      except psutil.NoSuchProcess:
-        # Runner is dead
-        pass
-      except psutil.Error as err:
-        self.log('  - Error sampling runner process [pid=%s]: %s' % (runner_pid, err))
-        continue
-      try:
-        latest_update = os.path.getmtime(self._runner_ckpt(task))
-      except (IOError, OSError) as err:
-        self.log('  - Error accessing runner ckpt: %s' % err)
-        continue
-      if self._clock.time() - latest_update < self.MAX_CHECKPOINT_TIME_DRIFT.as_(Time.SECONDS):
-        self.log('  - Runner is dead but under LOST threshold.')
-        continue
-      self.log('  - Runner is dead but beyond LOST threshold: %.1fs' % (
-          self._clock.time() - latest_update))
-      if self._terminate_task(task, kill=False):
-        updates[task.task_id] = ScheduleStatus.LOST
-        self.send_update(
-            driver, task.task_id, mesos_pb2.TASK_LOST, 'GC executor detected failed task runner.')
-
-    return updates
-
-  def _erase_sandbox(self, task):
-    header_sandbox = self.get_sandbox(task)
-    directory_sandbox = DirectorySandbox(header_sandbox) if header_sandbox else None
-    if directory_sandbox and directory_sandbox.exists():
-      self.log('Destroying DirectorySandbox for %s' % task.task_id)
-      try:
-        directory_sandbox.destroy()
-      except DirectorySandbox.Error as e:
-        self.log('Failed to destroy DirectorySandbox: %s' % e)
-    else:
-      self.log('Found no sandboxes for %s' % task.task_id)
-
-  def _gc(self, task):
-    """Erase the sandbox, logs and metadata of the given task."""
-
-    self.log('Erasing sandbox for %s' % task.task_id)
-    self._erase_sandbox(task)
-
-    collector = self._collector_class(task.checkpoint_root, task.task_id)
-
-    self.log('Erasing logs for %s' % task.task_id)
-    collector.erase_logs()
-
-    self.log('Erasing metadata for %s' % task.task_id)
-    collector.erase_metadata()
-
-  def garbage_collect(self, force_delete=frozenset()):
-    """Garbage collect tasks on the system no longer active or in the supplied force_delete.
-
-    Return a set of task_ids representing the tasks that were garbage collected.
-    """
-    active_tasks, finished_tasks = self.partition_tasks()
-    retained_executors = set(iter(self.linked_executors))
-    self.log('Executor sandboxes retained by Mesos:')
-    if retained_executors:
-      for r_e in sorted(retained_executors):
-        self.log('  %s' % r_e)
-    else:
-      self.log('  None')
-    for task in active_tasks:
-      if task.task_id not in retained_executors:
-        self.log('ERROR: Active task %s had its executor sandbox pulled.' % task.task_id)
-    gc_tasks = set()
-    for task in finished_tasks:
-      if task.task_id not in retained_executors:
-        gc_tasks.add(task)
-    gc_tasks.update(force_delete)
-    for gc_task in gc_tasks:
-      self._gc(gc_task)
-    return set(task.task_id for task in gc_tasks)
-
-  @property
-  def linked_executors(self):
-    """Generator yielding the executor sandboxes detected on the system."""
-    thermos_executor_prefix = 'thermos-'
-    for executor in self._executor_detector:
-      # It's possible for just the 'latest' symlink to be present but no run directories.
-      # This indicates that the task has been fully garbage collected.
-      if executor.executor_id.startswith(thermos_executor_prefix) and executor.run != 'latest':
-        yield executor.executor_id[len(thermos_executor_prefix):]
-
-  def _run_gc(self, task, retain_tasks, retain_start):
-    """
-      Reconcile the set of tasks to retain (provided by the scheduler) with the current state of
-      executors on this system. Garbage collect tasks/executors as appropriate.
-
-      Not re-entrant! Previous executions must complete (and clear self._task_id) before this can be
-      invoked.
-
-      Potentially blocking (e.g. on I/O) in self.garbage_collect()
-
-      Args:
-        task: TaskInfo provided by the slave
-        retain_tasks: mapping of task_id => ScheduleStatus, describing what the scheduler thinks is
-                      running on this system
-        retain_start: the time at which the retain_tasks message is effective -- this means that
-                      tasks started after the retain_tasks message is effective are skipped
-                      until future GC runs.
-    """
-    task_id = task.task_id.value
-    if self._task_id is not None:
-      raise RuntimeError('_run_gc() called [task_id=%s], but already running [task_id=%s]'
-                         % (task_id, self._task_id))
-    self._task_id = task_id
-    self.log('Launching garbage collection [task_id=%s]' % task_id)
-    self._start_time = retain_start
-    local_gc, _, _ = self.reconcile_states(self._driver, retain_tasks)
-    self.garbage_collect(local_gc)
-    self.send_update(
-        self._driver, task.task_id.value, mesos_pb2.TASK_FINISHED, 'Garbage collection finished.')
-    self.log('Garbage collection complete [task_id=%s]' % task_id)
-    self._task_id = self._start_time = None
-
-  def run(self):
-    """Main GC executor event loop.
-
-      Periodically perform state reconciliation with the set of tasks provided
-      by the slave, and garbage collect orphaned tasks on the system.
-    """
-    run_start = self._clock.time()
-
-    def should_terminate():
-      now = self._clock.time()
-      if now > run_start + self.MAXIMUM_EXECUTOR_LIFETIME.as_(Time.SECONDS):
-        return True
-      return self._stop_event.is_set()
-
-    while not should_terminate():
-      try:
-        _, (task, retain_tasks, retain_start) = self._gc_task_queue.popitem(0)
-        self._run_gc(task, retain_tasks, retain_start)
-      except KeyError:  # no enqueued GC tasks
-        pass
-      if self._driver is not None:
-        self.clean_orphans(self._driver)
-      # TODO(wickman) This should be polling with self._clock
-      self._stop_event.wait(self.POLL_WAIT.as_(Time.SECONDS))
-
-    # shutdown
-    if self._driver is not None:
-      try:
-        prev_task_id, _ = self._gc_task_queue.popitem(0)
-      except KeyError:  # no enqueued GC tasks
-        pass
-      else:
-        self.send_update(self._driver, prev_task_id, mesos_pb2.TASK_FINISHED,
-                         'Garbage collection skipped - GC executor shutting down')
-        # TODO(jon) Remove this once external MESOS-243 is resolved.
-        self.log('Sleeping briefly to mitigate https://issues.apache.org/jira/browse/MESOS-243')
-        self.log('Clock is %r, time is %s' % (self._clock, self._clock.time()))
-        self._clock.sleep(self.PERSISTENCE_WAIT.as_(Time.SECONDS))
-        self.log('Finished sleeping.')
-
-      self._driver.stop()
-
-  """ Mesos Executor API methods follow """
-
-  def launchTask(self, driver, task):
-    """Queue a new garbage collection run, and drop any currently-enqueued runs."""
-    if self._slave_id is None:
-      self._slave_id = task.slave_id.value
-    task_id = task.task_id.value
-    self.log('launchTask() got task_id: %s' % task_id)
-    if self._stop_event.is_set():
-      self.log('=> Executor is shutting down - ignoring task %s' % task_id)
-      self.send_update(
-          self._driver, task_id, mesos_pb2.TASK_FAILED, 'GC Executor is shutting down.')
-      return
-    elif task_id == self._task_id:
-      self.log('=> GC with task_id %s currently running - ignoring' % task_id)
-      return
-    elif task_id in self._gc_task_queue:
-      self.log('=> Already have task_id %s queued - ignoring' % task_id)
-      return
-    try:
-      art = thrift_deserialize(AdjustRetainedTasks(), task.data)
-    except Exception as err:
-      self.log('Error deserializing task: %s' % err)
-      self.send_update(
-          self._driver, task_id, mesos_pb2.TASK_FAILED, 'Deserialization of GC task failed')
-      return
-    try:
-      prev_task_id, _ = self._gc_task_queue.popitem(0)
-    except KeyError:  # no enqueued GC tasks - reset counter
-      self._dropped_tasks.write(0)
-    else:
-      self.log('=> Dropping previously queued GC with task_id %s' % prev_task_id)
-      self._dropped_tasks.increment()
-      self.log('=> Updating scheduler')
-      self.send_update(self._driver, prev_task_id, mesos_pb2.TASK_FINISHED,
-                       'Garbage collection skipped - GC executor received another task')
-    self.log('=> Adding %s to GC queue' % task_id)
-    self._gc_task_queue[task_id] = (task, art.retainedTasks, self._clock.time())
-
-  def killTask(self, driver, task_id):
-    """Remove the specified task from the queue, if it's not yet run. Otherwise, no-op."""
-    self.log('killTask() got task_id: %s' % task_id)
-    task = self._gc_task_queue.pop(task_id, None)
-    if task is not None:
-      self.log('=> Removed %s from queued GC tasks' % task_id)
-    elif task_id == self._task_id:
-      self.log('=> GC with task_id %s currently running - ignoring' % task_id)
-    else:
-      self.log('=> Unknown task_id %s - ignoring' % task_id)
-
-  def shutdown(self, driver):
-    """Trigger the Executor to shut down as soon as the current GC run is finished."""
-    self.log('shutdown() called - setting stop event')
-    self._stop_event.set()

http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/main/python/apache/thermos/cli/commands/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/cli/commands/BUILD b/src/main/python/apache/thermos/cli/commands/BUILD
index 1dae8c9..552eeb4 100644
--- a/src/main/python/apache/thermos/cli/commands/BUILD
+++ b/src/main/python/apache/thermos/cli/commands/BUILD
@@ -15,7 +15,6 @@
 python_library(
   name = 'commands',
   dependencies = [
-    ':gc',
     ':help',
     ':inspect',
     ':kill',
@@ -28,17 +27,6 @@ python_library(
 )
 
 python_library(
-  name = 'gc',
-  sources = ['gc.py'],
-  dependencies = [
-    '3rdparty/python:twitter.common.app',
-    '3rdparty/python:twitter.common.quantity',
-    'src/main/python/apache/thermos/cli:common',
-    'src/main/python/apache/thermos/monitoring:garbage',
-  ]
-)
-
-python_library(
   name = 'help',
   sources = ['help.py'],
   dependencies = [

http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/main/python/apache/thermos/cli/commands/gc.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/cli/commands/gc.py b/src/main/python/apache/thermos/cli/commands/gc.py
deleted file mode 100644
index 23d9ff4..0000000
--- a/src/main/python/apache/thermos/cli/commands/gc.py
+++ /dev/null
@@ -1,105 +0,0 @@
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import print_function
-
-from twitter.common import app
-from twitter.common.quantity.parse_simple import parse_data, parse_time
-
-from apache.thermos.cli.common import get_path_detector, tasks_from_re
-from apache.thermos.monitoring.garbage import GarbageCollectionPolicy, TaskGarbageCollector
-
-
-def set_keep(option, opt_str, value, parser):
-  setattr(parser.values, option.dest, opt_str.startswith('--keep'))
-
-
-@app.command
-@app.command_option("--max_age", metavar="AGE", default=None, dest='max_age',
-                    help="Max age in human readable form, e.g. 2d5h or 7200s")
-@app.command_option("--max_tasks", metavar="NUM", default=None, dest='max_tasks',
-                    help="Max number of tasks to keep.")
-@app.command_option("--max_space", metavar="SPACE", default=None, dest='max_space',
-                    help="Max space to allow for tasks, e.g. 20G.")
-@app.command_option("--keep-logs", "--delete-logs",
-                    metavar="PATH", default=True,
-                    action='callback', callback=set_keep, dest='keep_logs',
-                    help="Keep logs.")
-@app.command_option("--keep-metadata", "--delete-metadata",
-                    metavar="PATH", default=True,
-                    action='callback', callback=set_keep, dest='keep_metadata',
-                    help="Keep metadata.")
-@app.command_option("--force", default=False, action='store_true', dest='force',
-                    help="Perform garbage collection without confirmation")
-@app.command_option("--dryrun", default=False, action='store_true', dest='dryrun',
-                    help="Don't actually run garbage collection.")
-def gc(args, options):
-  """Garbage collect task(s) and task metadata.
-
-    Usage: thermos gc [options] [task_id1 task_id2 ...]
-
-    If tasks specified, restrict garbage collection to only those tasks,
-    otherwise all tasks are considered.  The optional constraints are still
-    honored.
-  """
-  print('Analyzing root at %s' % options.root)
-  gc_options = {}
-  if options.max_age is not None:
-    gc_options['max_age'] = parse_time(options.max_age)
-  if options.max_space is not None:
-    gc_options['max_space'] = parse_data(options.max_space)
-  if options.max_tasks is not None:
-    gc_options['max_tasks'] = int(options.max_tasks)
-  gc_options.update(include_metadata=not options.keep_metadata,
-                    include_logs=not options.keep_logs,
-                    verbose=True,
-                    logger=print)
-  if args:
-    gc_tasks = list(tasks_from_re(args, state='finished'))
-  else:
-    print('No task ids specified, using default collector.')
-    gc_tasks = [(task.checkpoint_root, task.task_id)
-        for task in GarbageCollectionPolicy(get_path_detector(), **gc_options).run()]
-
-  if not gc_tasks:
-    print('No tasks to garbage collect.  Exiting')
-    return
-
-  def maybe(function, *args):
-    if options.dryrun:
-      print('    would run %s%r' % (function.__name__, args))
-    else:
-      function(*args)
-
-  value = 'y'
-  if not options.force:
-    value = raw_input("Continue [y/N]? ") or 'N'
-  if value.lower() == 'y':
-    print('Running gc...')
-
-    for checkpoint_root, task_id in gc_tasks:
-      tgc = TaskGarbageCollector(checkpoint_root, task_id)
-      print('  Task %s ' % task_id, end='')
-      print('data (%s) ' % ('keeping' if options.keep_data else 'deleting'), end='')
-      print('logs (%s) ' % ('keeping' if options.keep_logs else 'deleting'), end='')
-      print('metadata (%s) ' % ('keeping' if options.keep_metadata else 'deleting'))
-      if not options.keep_data:
-        maybe(tgc.erase_data)
-      if not options.keep_logs:
-        maybe(tgc.erase_logs)
-      if not options.keep_metadata:
-        maybe(tgc.erase_metadata)
-      print('done.')
-  else:
-    print('Cancelling gc.')

http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/main/python/apache/thermos/cli/main.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/cli/main.py b/src/main/python/apache/thermos/cli/main.py
index f20f612..bb51353 100644
--- a/src/main/python/apache/thermos/cli/main.py
+++ b/src/main/python/apache/thermos/cli/main.py
@@ -20,7 +20,6 @@ from .common import clear_path_detectors, register_path_detector
 def register_commands(app):
   from apache.thermos.cli.common import generate_usage
   from apache.thermos.cli.commands import (
-      gc as gc_command,
       help as help_command,
       inspect as inspect_command,
       kill as kill_command,
@@ -32,7 +31,6 @@ def register_commands(app):
   )
 
   app.register_commands_from(
-      gc_command,
       help_command,
       inspect_command,
       kill_command,

http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/main/python/apache/thermos/core/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/core/BUILD b/src/main/python/apache/thermos/core/BUILD
index efb68e8..d47b7a2 100644
--- a/src/main/python/apache/thermos/core/BUILD
+++ b/src/main/python/apache/thermos/core/BUILD
@@ -30,18 +30,6 @@ python_library(
 )
 
 python_library(
-  name = 'inspector',
-  sources = ['inspector.py'],
-  dependencies = [
-    ':muxer',
-    '3rdparty/python:twitter.common.recordio',
-    'src/main/python/apache/thermos/common:ckpt',
-    'src/main/python/apache/thermos/common:path',
-    'api/src/main/thrift/org/apache/thermos:py-thrift',
-  ]
-)
-
-python_library(
   name = 'muxer',
   sources = ['muxer.py'],
   dependencies = [
@@ -87,7 +75,6 @@ python_library(
 python_library(
   name = 'core',
   dependencies = [
-    ':inspector',
     ':runner',
 
     # covering libs

http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/main/python/apache/thermos/core/helper.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/core/helper.py b/src/main/python/apache/thermos/core/helper.py
index 8cd3294..dda40ed 100644
--- a/src/main/python/apache/thermos/core/helper.py
+++ b/src/main/python/apache/thermos/core/helper.py
@@ -30,24 +30,6 @@ from apache.thermos.common.path import TaskPath
 from gen.apache.thermos.ttypes import ProcessState, ProcessStatus, RunnerCkpt, TaskState, TaskStatus
 
 
-class TaskKiller(object):
-  """
-    Task killing interface.
-  """
-
-  def __init__(self, task_id, checkpoint_root):
-    self._task_id = task_id
-    self._checkpoint_root = checkpoint_root
-
-  def kill(self, force=True):
-    TaskRunnerHelper.kill(self._task_id, self._checkpoint_root, force=force,
-                          terminal_status=TaskState.KILLED)
-
-  def lose(self, force=True):
-    TaskRunnerHelper.kill(self._task_id, self._checkpoint_root, force=force,
-                          terminal_status=TaskState.LOST)
-
-
 class TaskRunnerHelper(object):
   """
     TaskRunner helper methods that can be operated directly upon checkpoint

http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/main/python/apache/thermos/core/inspector.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/core/inspector.py b/src/main/python/apache/thermos/core/inspector.py
deleted file mode 100644
index 4fe8aa3..0000000
--- a/src/main/python/apache/thermos/core/inspector.py
+++ /dev/null
@@ -1,115 +0,0 @@
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import pwd
-from collections import namedtuple
-from contextlib import closing
-
-from twitter.common import log
-from twitter.common.recordio import RecordIO, ThriftRecordReader
-
-from apache.thermos.common.ckpt import CheckpointDispatcher
-from apache.thermos.common.path import TaskPath
-
-from .muxer import ProcessMuxer
-
-from gen.apache.thermos.ttypes import ProcessState, RunnerCkpt, RunnerState
-
-CheckpointInspection = namedtuple('CheckpointInspection',
-    ['runner_latest_update',
-     'process_latest_update',
-     'runner_processes',
-     'coordinator_processes',
-     'processes'])
-
-
-class CheckpointInspector(object):
-  def __init__(self, checkpoint_root):
-    self._path = TaskPath(root=checkpoint_root)
-
-  @staticmethod
-  def get_timestamp(process_record):
-    if process_record:
-      for timestamp in ('fork_time', 'start_time', 'stop_time'):
-        stamp = getattr(process_record, timestamp, None)
-        if stamp:
-          return stamp
-    return 0
-
-  def inspect(self, task_id):
-    """
-      Reconstructs the checkpoint stream and returns a CheckpointInspection.
-    """
-    dispatcher = CheckpointDispatcher()
-    state = RunnerState(processes={})
-    muxer = ProcessMuxer(self._path.given(task_id=task_id))
-
-    runner_processes = []
-    coordinator_processes = set()
-    processes = set()
-
-    def consume_process_record(record):
-      if not record.process_status:
-        return
-      try:
-        user_uid = pwd.getpwnam(state.header.user).pw_uid
-      except KeyError:
-        log.error('Could not find user: %s' % state.header.user)
-        return
-      if record.process_status.state == ProcessState.FORKED:
-        coordinator_processes.add((record.process_status.coordinator_pid, user_uid,
-                                   record.process_status.fork_time))
-      elif record.process_status.state == ProcessState.RUNNING:
-        processes.add((record.process_status.pid, user_uid,
-                       record.process_status.start_time))
-
-    # replay runner checkpoint
-    runner_pid = None
-    runner_latest_update = 0
-    try:
-      with open(self._path.given(task_id=task_id).getpath('runner_checkpoint')) as fp:
-        with closing(ThriftRecordReader(fp, RunnerCkpt)) as ckpt:
-          for record in ckpt:
-            dispatcher.dispatch(state, record)
-            runner_latest_update = max(runner_latest_update,
-                self.get_timestamp(record.process_status))
-            # collect all bound runners
-            if record.task_status:
-              if record.task_status.runner_pid != runner_pid:
-                runner_processes.append((record.task_status.runner_pid,
-                                         record.task_status.runner_uid or 0,
-                                         record.task_status.timestamp_ms))
-                runner_pid = record.task_status.runner_pid
-            elif record.process_status:
-              consume_process_record(record)
-    except (IOError, OSError, RecordIO.Error) as err:
-      log.debug('Error inspecting task runner checkpoint: %s' % err)
-      return
-
-    # register existing processes in muxer
-    for process_name in state.processes:
-      muxer.register(process_name)
-
-    # read process checkpoints
-    process_latest_update = runner_latest_update
-    for record in muxer.select():
-      process_latest_update = max(process_latest_update, self.get_timestamp(record.process_status))
-      consume_process_record(record)
-
-    return CheckpointInspection(
-      runner_latest_update=runner_latest_update,
-      process_latest_update=process_latest_update,
-      runner_processes=runner_processes,
-      coordinator_processes=coordinator_processes,
-      processes=processes)

http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/main/python/apache/thermos/monitoring/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/BUILD b/src/main/python/apache/thermos/monitoring/BUILD
index 633dd95..e885102 100644
--- a/src/main/python/apache/thermos/monitoring/BUILD
+++ b/src/main/python/apache/thermos/monitoring/BUILD
@@ -25,19 +25,6 @@ python_library(
 )
 
 python_library(
-  name = 'garbage',
-  sources = ['garbage.py'],
-  dependencies = [
-    ':detector',
-    '3rdparty/python:twitter.common.dirutil',
-    '3rdparty/python:twitter.common.lang',
-    '3rdparty/python:twitter.common.quantity',
-    'src/main/python/apache/thermos/common:ckpt',
-    'src/main/python/apache/thermos/common:path',
-  ]
-)
-
-python_library(
   name = 'monitor',
   sources = ['monitor.py'],
   dependencies = [
@@ -95,7 +82,6 @@ python_library(
   dependencies = [
     ':detector',
     ':disk',
-    ':garbage',
     ':monitor',
     ':process',
     ':resource',

http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/main/python/apache/thermos/monitoring/garbage.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/garbage.py b/src/main/python/apache/thermos/monitoring/garbage.py
deleted file mode 100644
index aa5a272..0000000
--- a/src/main/python/apache/thermos/monitoring/garbage.py
+++ /dev/null
@@ -1,198 +0,0 @@
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import os
-import sys
-import time
-from collections import namedtuple
-
-from twitter.common.dirutil import safe_bsize, safe_delete, safe_mtime, safe_rmtree
-from twitter.common.quantity import Amount, Data, Time
-
-from apache.thermos.common.ckpt import CheckpointDispatcher
-from apache.thermos.common.path import TaskPath
-
-from .detector import TaskDetector
-
-
-class TaskGarbageCollector(object):
-  """A task wrapper to manage its sandbox and log state."""
-
-  def __init__(self, checkpoint_root, task_id):
-    """
-    :param checkpoint_root: The checkpoint root to find the given task.
-    :param task_id: The task_id of the task whose state we wish to manage.
-    """
-
-    self._detector = TaskDetector(checkpoint_root)
-    self._task_id = task_id
-    self._pathspec = TaskPath(root=checkpoint_root, task_id=task_id)
-    self._state = CheckpointDispatcher.from_file(self._detector.get_checkpoint(task_id))
-
-  def get_age(self):
-    return safe_mtime(self._detector.get_checkpoint(self._task_id))
-
-  def get_metadata(self, with_size=True):
-    runner_ckpt = self._detector.get_checkpoint(self._task_id)
-    process_ckpts = [ckpt for ckpt in self._detector.get_process_checkpoints(self._task_id)]
-    # assumes task is in finished state.
-    json_spec = self._pathspec.given(state='finished').getpath('task_path')
-    for path in [json_spec, runner_ckpt] + process_ckpts:
-      if with_size:
-        yield path, safe_bsize(path)
-      else:
-        yield path
-
-  def get_logs(self, with_size=True):
-    if self._state and self._state.header and self._state.header.log_dir:
-      for path in self._detector.get_process_logs(self._task_id, self._state.header.log_dir):
-        if with_size:
-          yield path, safe_bsize(path)
-        else:
-          yield path
-
-  def get_data(self, with_size=True):
-    if self._state and self._state.header and self._state.header.sandbox:
-      for root, dirs, files in os.walk(self._state.header.sandbox):
-        for file in files:
-          filename = os.path.join(root, file)
-          if with_size:
-            yield filename, safe_bsize(filename)
-          else:
-            yield filename
-
-  def erase_task(self):
-    self.erase_data()
-    self.erase_logs()
-    self.erase_metadata()
-
-  def erase_metadata(self):
-    for fn in self.get_metadata(with_size=False):
-      safe_delete(fn)
-    safe_rmtree(self._pathspec.getpath('checkpoint_path'))
-
-  def erase_logs(self):
-    for fn in self.get_logs(with_size=False):
-      safe_delete(fn)
-    if self._state and self._state.header:
-      path = self._pathspec.given(log_dir=self._state.header.log_dir).getpath('process_logbase')
-      safe_rmtree(path)
-
-  def erase_data(self):
-    for fn in self.get_data(with_size=False):
-      safe_delete(fn)
-    if self._state and self._state.header and self._state.header.sandbox:
-      safe_rmtree(self._state.header.sandbox)
-
-
-class GarbageCollectionPolicy(object):
-  def __init__(self,
-               path_detector,
-               max_age=Amount(10 ** 10, Time.DAYS),
-               max_space=Amount(10 ** 10, Data.TB),
-               max_tasks=10 ** 10,
-               include_metadata=True,
-               include_logs=True,
-               verbose=False,
-               logger=sys.stdout.write):
-    """
-      Default garbage collection policy.
-
-      Arguments that may be specified:
-        max_age:   Amount(Time) (max age of a retained task)  [default: infinity]
-        max_space: Amount(Data) (max space to keep)           [default: infinity]
-        max_tasks: int (max number of tasks to keep)          [default: infinity]
-        include_metadata: boolean  (Whether or not to include metadata in the
-          space calculations.)  [default: True]
-        include_logs: boolean  (Whether or not to include logs in the
-          space calculations.)  [default: True]
-        verbose: boolean (whether or not to log)  [default: False]
-        logger: callable (function to call with log messages) [default: sys.stdout.write]
-    """
-    self._path_detector = path_detector
-    self._max_age = max_age
-    self._max_space = max_space
-    self._max_tasks = max_tasks
-    self._include_metadata = include_metadata
-    self._include_logs = include_logs
-    self._verbose = verbose
-    self._logger = logger
-
-  def log(self, msg):
-    if self._verbose:
-      self._logger(msg)
-
-  def get_finished_tasks(self):
-    """Yields (checkpoint_root, task_id) for finished tasks."""
-
-    for checkpoint_root in self._path_detector.get_paths():
-      for task_id in TaskDetector(checkpoint_root).get_task_ids(state='finished'):
-        yield (checkpoint_root, task_id)
-
-  def run(self):
-    tasks = []
-    now = time.time()
-
-    # age: The time (in seconds) since the last task transition to/from ACTIVE/FINISHED
-    # metadata_size: The size of the thermos checkpoint records for this task
-    # log_size: The size of the stdout/stderr logs for this task's processes
-    # data_size: The size of the sandbox of this task.
-    TaskTuple = namedtuple('TaskTuple',
-        'checkpoint_root task_id age metadata_size log_size data_size')
-
-    for checkpoint_root, task_id in self.get_finished_tasks():
-      collector = TaskGarbageCollector(checkpoint_root, task_id)
-
-      age = Amount(int(now - collector.get_age()), Time.SECONDS)
-      self.log('Analyzing task %s (age: %s)... ' % (task_id, age))
-      metadata_size = Amount(sum(sz for _, sz in collector.get_metadata()), Data.BYTES)
-      self.log('  metadata %.1fKB ' % metadata_size.as_(Data.KB))
-      log_size = Amount(sum(sz for _, sz in collector.get_logs()), Data.BYTES)
-      self.log('  logs %.1fKB ' % log_size.as_(Data.KB))
-      data_size = Amount(sum(sz for _, sz in collector.get_data()), Data.BYTES)
-      self.log('  data %.1fMB ' % data_size.as_(Data.MB))
-      tasks.append(TaskTuple(checkpoint_root, task_id, age, metadata_size, log_size, data_size))
-
-    gc_tasks = set()
-    gc_tasks.update(task for task in tasks if task.age > self._max_age)
-
-    self.log('After age filter: %s tasks' % len(gc_tasks))
-
-    def total_gc_size(task):
-      return sum([task.data_size,
-                  task.metadata_size if self._include_metadata else Amount(0, Data.BYTES),
-                  task.log_size if self._include_logs else Amount(0, Data.BYTES)],
-                  Amount(0, Data.BYTES))
-
-    total_used = Amount(0, Data.BYTES)
-    for task in sorted(tasks, key=lambda tsk: tsk.age, reverse=True):
-      if task not in gc_tasks:
-        total_used += total_gc_size(task)
-        if total_used > self._max_space:
-          gc_tasks.add(task)
-    self.log('After size filter: %s tasks' % len(gc_tasks))
-
-    for task in sorted(tasks, key=lambda tsk: tsk.age, reverse=True):
-      if task not in gc_tasks and len(tasks) - len(gc_tasks) > self._max_tasks:
-        gc_tasks.add(task)
-    self.log('After total task filter: %s tasks' % len(gc_tasks))
-
-    self.log('Deciding to garbage collect the following tasks:')
-    if gc_tasks:
-      for task in gc_tasks:
-        self.log('   %s' % repr(task))
-    else:
-      self.log('   None.')
-
-    return gc_tasks

http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/test/python/apache/aurora/executor/BUILD
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/BUILD b/src/test/python/apache/aurora/executor/BUILD
index f415ecc..e1c635c 100644
--- a/src/test/python/apache/aurora/executor/BUILD
+++ b/src/test/python/apache/aurora/executor/BUILD
@@ -35,29 +35,10 @@ python_test_suite(
 python_test_suite(
   name = 'executor-large',
   dependencies = [
-    ':gc_executor',
     ':thermos_executor',
   ]
 )
 
-
-python_tests(name = 'gc_executor',
-  sources = ['test_gc_executor.py'],
-  dependencies = [
-    '3rdparty/python:mock',
-    '3rdparty/python:twitter.common.app',
-    '3rdparty/python:twitter.common.concurrent',
-    '3rdparty/python:twitter.common.quantity',
-    '3rdparty/python:twitter.common.testing',
-    'src/main/python/apache/thermos/common:path',
-    'src/main/python/apache/thermos/config',
-    'src/main/python/apache/thermos/core:runner',
-    'api/src/main/thrift/org/apache/thermos:py-thrift',
-    'src/main/python/apache/aurora/executor:gc_executor',
-    'api/src/main/thrift/org/apache/aurora/gen:py-thrift',
-  ],
-)
-
 python_tests(name = 'thermos_executor',
   sources = ['test_thermos_executor.py'],
 #  timeout = Amount(5, Time.MINUTES),

http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/test/python/apache/aurora/executor/bin/BUILD
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/bin/BUILD b/src/test/python/apache/aurora/executor/bin/BUILD
index 2caab2a..713ce95 100644
--- a/src/test/python/apache/aurora/executor/bin/BUILD
+++ b/src/test/python/apache/aurora/executor/bin/BUILD
@@ -15,21 +15,11 @@
 python_test_suite(
   name = 'all',
   dependencies = [
-    ':gc_executor_entry_point',
     ':thermos_executor_entry_point',
   ]
 )
 
 python_tests(
-  name = 'gc_executor_entry_point',
-  sources = ['test_gc_executor_entry_point.py'],
-  dependencies = [
-    '3rdparty/python:mock',
-    'src/main/python/apache/aurora/executor/bin:gc_executor_source',
-  ],
-)
-
-python_tests(
   name = 'thermos_executor_entry_point',
   sources = ['test_thermos_executor_entry_point.py'],
   dependencies = [

http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/test/python/apache/aurora/executor/bin/test_gc_executor_entry_point.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/bin/test_gc_executor_entry_point.py b/src/test/python/apache/aurora/executor/bin/test_gc_executor_entry_point.py
deleted file mode 100644
index d4c1d57..0000000
--- a/src/test/python/apache/aurora/executor/bin/test_gc_executor_entry_point.py
+++ /dev/null
@@ -1,40 +0,0 @@
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import unittest
-
-from mock import create_autospec, Mock, patch
-
-from apache.aurora.executor.bin.gc_executor_main import initialize, proxy_main
-from apache.aurora.executor.gc_executor import ThermosGCExecutor
-from apache.thermos.monitoring.detector import ChainedPathDetector
-
-
-def test_gc_executor_valid_import_dependencies():
-  assert proxy_main is not None
-
-
-class GcExecutorMainTest(unittest.TestCase):
-  def test_chained_path_detector_initialized(self):
-    mock_gc_executor = create_autospec(spec=ThermosGCExecutor)
-    with patch('apache.aurora.executor.bin.gc_executor_main.ThermosGCExecutor',
-        return_value=mock_gc_executor) as mock:
-      with patch('apache.aurora.executor.bin.gc_executor_main.MesosExecutorDriver',
-          return_value=Mock()):
-
-        initialize()
-        assert len(mock.mock_calls) == 1
-        call = mock.mock_calls[0]
-        _, args, _ = call
-        assert len(args) == 1 and isinstance(args[0], ChainedPathDetector)


Mime
View raw message