aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wick...@apache.org
Subject incubator-aurora git commit: Port GC executor to PathDetector interface
Date Wed, 25 Feb 2015 18:06:48 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 4d5885b84 -> 7de346148


Port GC executor to PathDetector interface

This makes the GC executor detect checkpoint roots via the PathDetector
interface.  This paves the way to detecting checkpoint roots both from
/var/run/thermos and from /var/lib/mesos/**

Testing Done:
./pants test.pytest --no-fast src/main/python::
e2e tests

Bugs closed: AURORA-1025

Reviewed at https://reviews.apache.org/r/30749/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/7de34614
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/7de34614
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/7de34614

Branch: refs/heads/master
Commit: 7de346148768e446518c76b41d3d121e3004bdcd
Parents: 4d5885b
Author: Brian Wickman <wickman@apache.org>
Authored: Wed Feb 25 10:06:38 2015 -0800
Committer: Brian Wickman <wickman@apache.org>
Committed: Wed Feb 25 10:06:38 2015 -0800

----------------------------------------------------------------------
 .../python/apache/aurora/executor/bin/BUILD     |  35 ++-
 .../aurora/executor/bin/gc_executor_main.py     |  14 +-
 .../executor/bin/thermos_executor_main.py       |  10 +-
 .../apache/aurora/executor/gc_executor.py       | 212 +++++++++++--------
 src/main/python/apache/thermos/bin/thermos.py   |  23 +-
 .../python/apache/thermos/monitoring/garbage.py | 165 +++++++--------
 .../python/apache/aurora/executor/bin/BUILD     |  23 ++
 .../bin/test_gc_executor_entry_point.py         |   3 +
 .../bin/test_thermos_executor_entry_point.py    |   3 +
 .../apache/aurora/executor/test_gc_executor.py  | 123 ++++++-----
 10 files changed, 365 insertions(+), 246 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7de34614/src/main/python/apache/aurora/executor/bin/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/bin/BUILD b/src/main/python/apache/aurora/executor/bin/BUILD
index 6530f49..25cc3d0 100644
--- a/src/main/python/apache/aurora/executor/bin/BUILD
+++ b/src/main/python/apache/aurora/executor/bin/BUILD
@@ -12,13 +12,10 @@
 # limitations under the License.
 #
 
-python_binary(
-  name = 'thermos_executor',
-  source = 'thermos_executor_main.py',
-  entry_point = 'apache.aurora.executor.bin.thermos_executor_main:proxy_main',
-  always_write_cache = True,
+python_library(
+  name = 'thermos_executor_source',
+  sources = ['thermos_executor_main.py'],
   dependencies = [
-    '3rdparty/python:mesos.native',
     # To prevent an alpha version of protobuf from being pulled down by pants we
     # specify protobuf here. See AURORA-1128 for more details.
     '3rdparty/python:protobuf',
@@ -39,21 +36,39 @@ python_binary(
 )
 
 python_binary(
-  name = 'gc_executor',
-  source = 'gc_executor_main.py',
-  entry_point = 'apache.aurora.executor.bin.gc_executor_main:proxy_main',
+  name = 'thermos_executor',
+  entry_point = 'apache.aurora.executor.bin.thermos_executor_main:proxy_main',
   always_write_cache = True,
   dependencies = [
     '3rdparty/python:mesos.native',
+    ':thermos_executor_source',
+  ]
+)
+
+python_library(
+  name = 'gc_executor_source',
+  sources = ['gc_executor_main.py'],
+  dependencies = [
     # To prevent an alpha version of protobuf from being pulled down by pants we
     # specify protobuf here. See AURORA-1128 for more details.
     '3rdparty/python:protobuf',
     '3rdparty/python:twitter.common.app',
     '3rdparty/python:twitter.common.log',
     '3rdparty/python:twitter.common.metrics',
-    'src/main/python/apache/thermos/common:path',
+    'src/main/python/apache/thermos/common:constants',
+    'src/main/python/apache/thermos/monitoring:detector',
     'src/main/python/apache/aurora/executor/common:executor_detector',
     'src/main/python/apache/aurora/executor:executor_vars',
     'src/main/python/apache/aurora/executor:gc_executor',
   ]
 )
+
+python_binary(
+  name = 'gc_executor',
+  entry_point = 'apache.aurora.executor.bin.gc_executor_main:proxy_main',
+  always_write_cache = True,
+  dependencies = [
+    '3rdparty/python:mesos.native',
+    ':gc_executor_source',
+  ]
+)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7de34614/src/main/python/apache/aurora/executor/bin/gc_executor_main.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/bin/gc_executor_main.py b/src/main/python/apache/aurora/executor/bin/gc_executor_main.py
index b903bcb..9e65084 100644
--- a/src/main/python/apache/aurora/executor/bin/gc_executor_main.py
+++ b/src/main/python/apache/aurora/executor/bin/gc_executor_main.py
@@ -19,14 +19,19 @@ slave.
 
 """
 
-from mesos.native import MesosExecutorDriver
+try:
+  from mesos.native import MesosExecutorDriver
+except ImportError:
+  MesosExecutorDriver = None
+
 from twitter.common import app, log
 from twitter.common.log.options import LogOptions
 from twitter.common.metrics.sampler import DiskMetricWriter
 
-from apache.aurora.executor.executor_detector import ExecutorDetector
+from apache.aurora.executor.common.executor_detector import ExecutorDetector
 from apache.aurora.executor.gc_executor import ThermosGCExecutor
 from apache.thermos.common.constants import DEFAULT_CHECKPOINT_ROOT
+from apache.thermos.monitoring.detector import FixedPathDetector
 
 app.configure(debug=True)
 
@@ -39,8 +44,11 @@ LogOptions.set_log_dir(ExecutorDetector.LOG_PATH)
 
 def proxy_main():
   def main():
+    if MesosExecutorDriver is None:
+      app.error('Could not load MesosExecutorDriver!')
+
     # Create executor stub
-    thermos_gc_executor = ThermosGCExecutor(checkpoint_root=DEFAULT_CHECKPOINT_ROOT)
+    thermos_gc_executor = ThermosGCExecutor(FixedPathDetector(DEFAULT_CHECKPOINT_ROOT))
     thermos_gc_executor.start()
 
     # Start metrics collection

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7de34614/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
index 0752d50..3174e1f 100644
--- a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
+++ b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
@@ -21,7 +21,6 @@ slave.
 
 import os
 
-from mesos.native import MesosExecutorDriver
 from twitter.common import app, log
 from twitter.common.log.options import LogOptions
 
@@ -37,6 +36,12 @@ from apache.aurora.executor.thermos_task_runner import (
 )
 from apache.thermos.common.constants import DEFAULT_CHECKPOINT_ROOT
 
+try:
+  from mesos.native import MesosExecutorDriver
+except ImportError:
+  MesosExecutorDriver = None
+
+
 CWD = os.environ.get('MESOS_SANDBOX', '.')
 
 app.configure(debug=True)
@@ -119,6 +124,9 @@ class UserOverrideDirectorySandboxProvider(DefaultSandboxProvider):
 
 def proxy_main():
   def main(args, options):
+    if MesosExecutorDriver is None:
+      app.error('Could not load MesosExecutorDriver!')
+
     # status providers:
     status_providers = [
         HealthCheckerProvider(),

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7de34614/src/main/python/apache/aurora/executor/gc_executor.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/gc_executor.py b/src/main/python/apache/aurora/executor/gc_executor.py
index dbec82f..a7776b5 100644
--- a/src/main/python/apache/aurora/executor/gc_executor.py
+++ b/src/main/python/apache/aurora/executor/gc_executor.py
@@ -23,6 +23,7 @@ slaves utilising the Thermos executor.
 import os
 import threading
 import time
+from collections import namedtuple
 
 import psutil
 from mesos.interface import mesos_pb2
@@ -37,7 +38,7 @@ from apache.thermos.common.ckpt import CheckpointDispatcher
 from apache.thermos.common.path import TaskPath
 from apache.thermos.core.helper import TaskKiller
 from apache.thermos.core.inspector import CheckpointInspector
-from apache.thermos.monitoring.detector import TaskDetector
+from apache.thermos.monitoring.detector import PathDetector, TaskDetector
 from apache.thermos.monitoring.garbage import TaskGarbageCollector
 
 from .common.executor_detector import ExecutorDetector
@@ -69,6 +70,13 @@ THERMOS_TO_MESOS_STATES = {
 }
 
 
+# RootedTask is a (checkpoint_root, task_id) tuple.  Before, checkpoint_root was assumed to be a
+# globally defined location e.g. '/var/run/thermos'.  We are trying to move checkpoints into
+# sandboxes, which mean that each task will have its own checkpoint_root, so we can no longer rely
+# upon a single invariant checkpoint root passed into the GC executor constructor.
+RootedTask = namedtuple('RootedTask', 'checkpoint_root task_id')
+
+
 class ThermosGCExecutor(ExecutorBase, ExceptionalThread, Observable):
   """
     Thermos GC Executor, responsible for:
@@ -95,7 +103,7 @@ class ThermosGCExecutor(ExecutorBase, ExceptionalThread, Observable):
   PERSISTENCE_WAIT = Amount(5, Time.SECONDS)
 
   def __init__(self,
-               checkpoint_root,
+               path_detector,
                verbose=True,
                task_killer=TaskKiller,
                executor_detector=ExecutorDetector,
@@ -116,22 +124,29 @@ class ThermosGCExecutor(ExecutorBase, ExceptionalThread, Observable):
     self._slave_id = None  # cache the slave ID provided by the slave
     self._task_id = None  # the task_id currently being executed by the ThermosGCExecutor, if any
     self._start_time = None  # the start time of a task currently being executed, if any
-    self._detector = executor_detector()
-    self._collector = task_garbage_collector(root=checkpoint_root)
+    self._executor_detector = executor_detector()
+    self._collector_class = task_garbage_collector
     self._clock = clock
     self._task_killer = task_killer
-    self._checkpoint_root = checkpoint_root
+    if not isinstance(path_detector, PathDetector):
+      raise TypeError('ThermosGCExecutor expects a path_detector of type PathDetector, got %s' %
+          type(path_detector))
+    self._path_detector = path_detector
     self._dropped_tasks = AtomicGauge('dropped_tasks')
     self.metrics.register(self._dropped_tasks)
 
-  def _runner_ckpt(self, task_id):
-    """Return the runner checkpoint file for a given task_id."""
-    return TaskPath(root=self._checkpoint_root, task_id=task_id).getpath('runner_checkpoint')
+  def _runner_ckpt(self, task):
+    """Return the runner checkpoint file for a given task.
 
-  def _terminate_task(self, task_id, kill=True):
+    :param task: An instance of a task to retrieve checkpoint path
+    :type task: :class:`RootedTask` instance
+    """
+    return TaskPath(root=task.checkpoint_root, task_id=task.task_id).getpath('runner_checkpoint')
+
+  def _terminate_task(self, task, kill=True):
     """Terminate a task using the associated task killer. Returns a boolean indicating success."""
-    killer = self._task_killer(task_id, self._checkpoint_root)
-    self.log('Terminating %s...' % task_id)
+    killer = self._task_killer(task.task_id, task.checkpoint_root)
+    self.log('Terminating %s...' % task.task_id)
     runner_terminate = killer.kill if kill else killer.lose
     try:
       runner_terminate(force=True)
@@ -141,30 +156,37 @@ class ThermosGCExecutor(ExecutorBase, ExceptionalThread, Observable):
       return False
 
   def partition_tasks(self):
-    """Return active/finished tasks as discovered from the checkpoint root."""
-    detector = TaskDetector(root=self._checkpoint_root)
-    active_tasks = set(t_id for _, t_id in detector.get_task_ids(state='active'))
-    finished_tasks = set(t_id for _, t_id in detector.get_task_ids(state='finished'))
+    """Return active/finished tasks as discovered from the checkpoint roots."""
+    active_tasks, finished_tasks = set(), set()
+
+    for checkpoint_root in self._path_detector.get_paths():
+      detector = TaskDetector(root=checkpoint_root)
+
+      active_tasks.update(RootedTask(checkpoint_root, task_id)
+          for _, task_id in detector.get_task_ids(state='active'))
+      finished_tasks.update(RootedTask(checkpoint_root, task_id)
+          for _, task_id in detector.get_task_ids(state='finished'))
+
     return active_tasks, finished_tasks
 
-  def get_states(self, task_id):
+  def get_states(self, task):
     """Returns the (timestamp, status) tuples of the task or [] if could not replay."""
-    statuses = CheckpointDispatcher.iter_statuses(self._runner_ckpt(task_id))
+    statuses = CheckpointDispatcher.iter_statuses(self._runner_ckpt(task))
     try:
       return [(state.timestamp_ms / 1000.0, state.state) for state in statuses]
     except CheckpointDispatcher.ErrorRecoveringState:
       return []
 
-  def get_sandbox(self, task_id):
+  def get_sandbox(self, task):
     """Returns the sandbox of the task, or None if it has not yet been initialized."""
     try:
-      for update in CheckpointDispatcher.iter_updates(self._runner_ckpt(task_id)):
+      for update in CheckpointDispatcher.iter_updates(self._runner_ckpt(task)):
         if update.runner_header and update.runner_header.sandbox:
           return update.runner_header.sandbox
     except CheckpointDispatcher.ErrorRecoveringState:
       return None
 
-  def maybe_terminate_unknown_task(self, task_id):
+  def maybe_terminate_unknown_task(self, task):
     """Terminate a task if we believe the scheduler doesn't know about it.
 
        It's possible for the scheduler to queue a GC and launch a task afterwards, in which
@@ -174,29 +196,29 @@ class ThermosGCExecutor(ExecutorBase, ExceptionalThread, Observable):
        Returns:
          boolean indicating whether the task was terminated
     """
-    states = self.get_states(task_id)
+    states = self.get_states(task)
     if states:
       task_start_time, _ = states[0]
       if self._start_time - task_start_time > self.MINIMUM_KILL_AGE.as_(Time.SECONDS):
-        return self._terminate_task(task_id)
+        return self._terminate_task(task)
     return False
 
-  def should_gc_task(self, task_id):
+  def should_gc_task(self, task):
     """Check if a possibly-corrupt task should be locally GCed
 
       A task should be GCed if its checkpoint stream appears to be corrupted and the kill age
       threshold is exceeded.
 
        Returns:
-         set, containing the task_id if it should be marked for local GC, or empty otherwise
+         set, containing the task if it should be marked for local GC, or empty otherwise
     """
-    runner_ckpt = self._runner_ckpt(task_id)
+    runner_ckpt = self._runner_ckpt(task)
     if not os.path.exists(runner_ckpt):
       return set()
     latest_update = os.path.getmtime(runner_ckpt)
     if self._start_time - latest_update > self.MINIMUM_KILL_AGE.as_(Time.SECONDS):
-      self.log('Got corrupt checkpoint file for %s - marking for local GC' % task_id)
-      return set([task_id])
+      self.log('Got corrupt checkpoint file for %s - marking for local GC' % task.task_id)
+      return set([task])
     else:
       self.log('Checkpoint file unreadable, but not yet beyond MINIMUM_KILL_AGE threshold')
       return set()
@@ -227,27 +249,26 @@ class ThermosGCExecutor(ExecutorBase, ExceptionalThread, Observable):
          !EXISTS  | TERMINAL             => delete
 
       Returns tuple of (local_gc, remote_gc, updates), where:
-        local_gc - set of task_ids to be GCed locally
+        local_gc - set of RootedTasks to be GCed locally
         remote_gc - set of task_ids to be deleted on the scheduler
         updates - dictionary of updates sent to the scheduler (task_id: ScheduleStatus)
     """
     def partition(rt):
       active, starting, finished = set(), set(), set()
-      for task_id, schedule_status in rt.items():
+      for task, schedule_status in rt.items():
         if schedule_status in TERMINAL_STATES:
-          finished.add(task_id)
+          finished.add(task)
         elif (schedule_status == ScheduleStatus.STARTING or
               schedule_status == ScheduleStatus.ASSIGNED):
-          starting.add(task_id)
+          starting.add(task)
         else:
-          active.add(task_id)
+          active.add(task)
       return active, starting, finished
 
     local_active, local_finished = self.partition_tasks()
     sched_active, sched_starting, sched_finished = partition(retained_tasks)
-    local_task_ids = local_active | local_finished
-    sched_task_ids = sched_active | sched_starting | sched_finished
-    all_task_ids = local_task_ids | sched_task_ids
+    local_tasks = local_active | local_finished
+    sched_tasks = sched_active | sched_starting | sched_finished
 
     self.log('Told to retain the following task ids:')
     for task_id, schedule_status in retained_tasks.items():
@@ -255,51 +276,59 @@ class ThermosGCExecutor(ExecutorBase, ExceptionalThread, Observable):
           (task_id, ScheduleStatus._VALUES_TO_NAMES.get(schedule_status, 'UNKNOWN')))
 
     self.log('Local active tasks:')
-    for task_id in local_active:
-      self.log('  => %s' % task_id)
+    for task in local_active:
+      self.log('  => %s' % task.task_id)
 
     self.log('Local finished tasks:')
-    for task_id in local_finished:
-      self.log('  => %s' % task_id)
+    for task in local_finished:
+      self.log('  => %s' % task.task_id)
 
-    local_gc, remote_gc = set(), set()
+    local_gc, remote_gc_ids = set(), set()
     updates = {}
 
-    for task_id in all_task_ids:
-      if task_id in local_active and task_id not in (sched_active | sched_starting):
-        self.log('Inspecting task %s for termination.' % task_id)
-        if not self.maybe_terminate_unknown_task(task_id):
-          local_gc.update(self.should_gc_task(task_id))
-      if task_id in local_finished and task_id not in sched_task_ids:
-        self.log('Queueing task %s for local deletion.' % task_id)
-        local_gc.add(task_id)
-      if task_id in local_finished and task_id in (sched_active | sched_starting):
-        self.log('Task %s finished but scheduler thinks active/starting.' % task_id)
-        states = self.get_states(task_id)
+    for task in local_active:
+      if task.task_id not in (sched_active | sched_starting):
+        self.log('Inspecting task %s for termination.' % task.task_id)
+        if not self.maybe_terminate_unknown_task(task):
+          local_gc.update(self.should_gc_task(task))
+
+    for task in local_finished:
+      if task.task_id not in sched_tasks:
+        self.log('Queueing task %s for local deletion.' % task.task_id)
+        local_gc.add(task)
+      if task.task_id in (sched_active | sched_starting):
+        self.log('Task %s finished but scheduler thinks active/starting.' % task.task_id)
+        states = self.get_states(task)
         if states:
           _, last_state = states[-1]
-          updates[task_id] = THERMOS_TO_TWITTER_STATES.get(
-              last_state,
-              ScheduleStatus.LOST)
+          updates[task.task_id] = THERMOS_TO_TWITTER_STATES.get(last_state, ScheduleStatus.LOST)
           self.send_update(
               driver,
-              task_id,
+              task.task_id,
               THERMOS_TO_MESOS_STATES.get(last_state, mesos_pb2.TASK_LOST),
               'Task finish detected by GC executor.')
         else:
-          local_gc.update(self.should_gc_task(task_id))
-      if task_id in sched_finished and task_id not in local_task_ids:
+          local_gc.update(self.should_gc_task(task))
+
+    local_task_ids = set(task.task_id for task in local_tasks)
+
+    for task_id in sched_finished:
+      if task_id not in local_task_ids:
         self.log('Queueing task %s for remote deletion.' % task_id)
-        remote_gc.add(task_id)
-      if task_id not in local_task_ids and task_id in sched_active:
+        remote_gc_ids.add(task_id)
+
+    for task_id in sched_active:
+      if task_id not in local_task_ids:
         self.log('Know nothing about task %s, telling scheduler of LOSS.' % task_id)
         updates[task_id] = ScheduleStatus.LOST
         self.send_update(
             driver, task_id, mesos_pb2.TASK_LOST, 'GC executor found no trace of task.')
-      if task_id not in local_task_ids and task_id in sched_starting:
+
+    for task_id in sched_starting:
+      if task_id not in local_task_ids:
         self.log('Know nothing about task %s, but scheduler says STARTING - passing' % task_id)
 
-    return local_gc, remote_gc, updates
+    return local_gc, remote_gc_ids, updates
 
   def clean_orphans(self, driver):
     """Inspect checkpoints for trees that have been kill -9'ed but not properly cleaned up."""
@@ -307,17 +336,17 @@ class ThermosGCExecutor(ExecutorBase, ExceptionalThread, Observable):
     active_tasks, _ = self.partition_tasks()
     updates = {}
 
-    inspector = CheckpointInspector(self._checkpoint_root)
-
     def is_our_process(process, uid, timestamp):
       if process.uids().real != uid:
         return False
       estimated_start_time = self._clock.time() - process.create_time()
       return abs(timestamp - estimated_start_time) < self.MAX_PID_TIME_DRIFT.as_(Time.SECONDS)
 
-    for task_id in active_tasks:
-      self.log('Inspecting running task: %s' % task_id)
-      inspection = inspector.inspect(task_id)
+    for task in active_tasks:
+      inspector = CheckpointInspector(task.checkpoint_root)
+
+      self.log('Inspecting running task: %s' % task.task_id)
+      inspection = inspector.inspect(task.task_id)
       if not inspection:
         self.log('  - Error inspecting task runner')
         continue
@@ -339,7 +368,7 @@ class ThermosGCExecutor(ExecutorBase, ExceptionalThread, Observable):
         self.log('  - Error sampling runner process [pid=%s]: %s' % (runner_pid, err))
         continue
       try:
-        latest_update = os.path.getmtime(self._runner_ckpt(task_id))
+        latest_update = os.path.getmtime(self._runner_ckpt(task))
       except (IOError, OSError) as err:
         self.log('  - Error accessing runner ckpt: %s' % err)
         continue
@@ -348,34 +377,38 @@ class ThermosGCExecutor(ExecutorBase, ExceptionalThread, Observable):
         continue
       self.log('  - Runner is dead but beyond LOST threshold: %.1fs' % (
           self._clock.time() - latest_update))
-      if self._terminate_task(task_id, kill=False):
-        updates[task_id] = ScheduleStatus.LOST
+      if self._terminate_task(task, kill=False):
+        updates[task.task_id] = ScheduleStatus.LOST
         self.send_update(
-            driver, task_id, mesos_pb2.TASK_LOST, 'GC executor detected failed task runner.')
+            driver, task.task_id, mesos_pb2.TASK_LOST, 'GC executor detected failed task runner.')
 
     return updates
 
-  def _erase_sandbox(self, task_id):
-    # TODO(wickman) Only mesos should be in the business of garbage collecting sandboxes.
-    header_sandbox = self.get_sandbox(task_id)
+  def _erase_sandbox(self, task):
+    header_sandbox = self.get_sandbox(task)
     directory_sandbox = DirectorySandbox(header_sandbox) if header_sandbox else None
     if directory_sandbox and directory_sandbox.exists():
-      self.log('Destroying DirectorySandbox for %s' % task_id)
+      self.log('Destroying DirectorySandbox for %s' % task.task_id)
       try:
         directory_sandbox.destroy()
       except DirectorySandbox.Error as e:
         self.log('Failed to destroy DirectorySandbox: %s' % e)
     else:
-      self.log('Found no sandboxes for %s' % task_id)
+      self.log('Found no sandboxes for %s' % task.task_id)
 
-  def _gc(self, task_id):
+  def _gc(self, task):
     """Erase the sandbox, logs and metadata of the given task."""
-    self.log('Erasing sandbox for %s' % task_id)
-    self._erase_sandbox(task_id)
-    self.log('Erasing logs for %s' % task_id)
-    self._collector.erase_logs(task_id)
-    self.log('Erasing metadata for %s' % task_id)
-    self._collector.erase_metadata(task_id)
+
+    self.log('Erasing sandbox for %s' % task.task_id)
+    self._erase_sandbox(task)
+
+    collector = self._collector_class(task.checkpoint_root, task.task_id)
+
+    self.log('Erasing logs for %s' % task.task_id)
+    collector.erase_logs()
+
+    self.log('Erasing metadata for %s' % task.task_id)
+    collector.erase_metadata()
 
   def garbage_collect(self, force_delete=frozenset()):
     """Garbage collect tasks on the system no longer active or in the supplied force_delete.
@@ -390,18 +423,23 @@ class ThermosGCExecutor(ExecutorBase, ExceptionalThread, Observable):
         self.log('  %s' % r_e)
     else:
       self.log('  None')
-    for task_id in (active_tasks - retained_executors):
-      self.log('ERROR: Active task %s had its executor sandbox pulled.' % task_id)
-    gc_tasks = (finished_tasks - retained_executors) | force_delete
+    for task in active_tasks:
+      if task.task_id not in retained_executors:
+        self.log('ERROR: Active task %s had its executor sandbox pulled.' % task.task_id)
+    gc_tasks = set()
+    for task in finished_tasks:
+      if task.task_id not in retained_executors:
+        gc_tasks.add(task)
+    gc_tasks.update(force_delete)
     for gc_task in gc_tasks:
       self._gc(gc_task)
-    return gc_tasks
+    return set(task.task_id for task in gc_tasks)
 
   @property
   def linked_executors(self):
     """Generator yielding the executor sandboxes detected on the system."""
     thermos_executor_prefix = 'thermos-'
-    for executor in self._detector:
+    for executor in self._executor_detector:
       # It's possible for just the 'latest' symlink to be present but no run directories.
       # This indicates that the task has been fully garbage collected.
       if executor.executor_id.startswith(thermos_executor_prefix) and executor.run != 'latest':

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7de34614/src/main/python/apache/thermos/bin/thermos.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/bin/thermos.py b/src/main/python/apache/thermos/bin/thermos.py
index 161bbdb..0853a98 100644
--- a/src/main/python/apache/thermos/bin/thermos.py
+++ b/src/main/python/apache/thermos/bin/thermos.py
@@ -41,7 +41,7 @@ from apache.thermos.config.schema import Process, Resources, Task
 from apache.thermos.core.helper import TaskRunnerHelper
 from apache.thermos.core.runner import TaskRunner
 from apache.thermos.monitoring.detector import TaskDetector
-from apache.thermos.monitoring.garbage import DefaultCollector, TaskGarbageCollector
+from apache.thermos.monitoring.garbage import GarbageCollectionPolicy, TaskGarbageCollector
 from apache.thermos.monitoring.monitor import TaskMonitor
 
 from gen.apache.thermos.ttypes import ProcessState, RunnerCkpt, RunnerState, TaskState
@@ -434,13 +434,13 @@ def gc(args, options):
                     include_logs=not options.keep_logs,
                     verbose=True,
                     logger=print)
-  tgc = TaskGarbageCollector(root=options.root)
-
   if args:
-    gc_tasks = tasks_from_re(args, options.root, state='finished')
+    gc_tasks = [(options.root, task_id)
+        for task_id in tasks_from_re(args, options.root, state='finished')]
   else:
     print('No task ids specified, using default collector.')
-    gc_tasks = [task.task_id for task in DefaultCollector(tgc, **gc_options).run()]
+    gc_tasks = [(task.checkpoint_root, task.task_id)
+        for task in GarbageCollectionPolicy(**gc_options).run()]
 
   if not gc_tasks:
     print('No tasks to garbage collect.  Exiting')
@@ -457,18 +457,19 @@ def gc(args, options):
     value = raw_input("Continue [y/N]? ") or 'N'
   if value.lower() == 'y':
     print('Running gc...')
-    tgc = TaskGarbageCollector(root=options.root)
-    for task in gc_tasks:
-      print('  Task %s ' % task, end='')
+
+    for checkpoint_root, task_id in gc_tasks:
+      tgc = TaskGarbageCollector(checkpoint_root, task_id)
+      print('  Task %s ' % task_id, end='')
       print('data (%s) ' % ('keeping' if options.keep_data else 'deleting'), end='')
       print('logs (%s) ' % ('keeping' if options.keep_logs else 'deleting'), end='')
       print('metadata (%s) ' % ('keeping' if options.keep_metadata else 'deleting'))
       if not options.keep_data:
-        maybe(tgc.erase_data, task)
+        maybe(tgc.erase_data)
       if not options.keep_logs:
-        maybe(tgc.erase_logs, task)
+        maybe(tgc.erase_logs)
       if not options.keep_metadata:
-        maybe(tgc.erase_metadata, task)
+        maybe(tgc.erase_metadata)
       print('done.')
   else:
     print('Cancelling gc.')

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7de34614/src/main/python/apache/thermos/monitoring/garbage.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/garbage.py b/src/main/python/apache/thermos/monitoring/garbage.py
index 69bf8e4..53bb034 100644
--- a/src/main/python/apache/thermos/monitoring/garbage.py
+++ b/src/main/python/apache/thermos/monitoring/garbage.py
@@ -15,11 +15,9 @@
 import os
 import sys
 import time
-from abc import abstractmethod
 from collections import namedtuple
 
 from twitter.common.dirutil import safe_bsize, safe_delete, safe_rmtree
-from twitter.common.lang import Interface
 from twitter.common.quantity import Amount, Data, Time
 
 from apache.thermos.common.ckpt import CheckpointDispatcher
@@ -29,45 +27,43 @@ from .detector import TaskDetector
 
 
 class TaskGarbageCollector(object):
-  def __init__(self, root):
-    self._root = root
-    self._detector = TaskDetector(root=self._root)
-    self._states = {}
+  """A task wrapper to manage its sandbox and log state."""
 
-  def state(self, task_id):
-    if task_id not in self._states:
-      self._states[task_id] = CheckpointDispatcher.from_file(self._detector.get_checkpoint(task_id))
-    return self._states[task_id]
+  def __init__(self, checkpoint_root, task_id):
+    """
+    :param checkpoint_root: The checkpoint root to find the given task.
+    :param task_id: The task_id of the task whose state we wish to manage.
+    """
 
-  def get_age(self, task_id):
-    return os.path.getmtime(self._detector.get_checkpoint(task_id))
+    self._detector = TaskDetector(checkpoint_root)
+    self._task_id = task_id
+    self._state = CheckpointDispatcher.from_file(self._detector.get_checkpoint(task_id))
 
-  def get_finished_tasks(self):
-    return [task_id for _, task_id in self._detector.get_task_ids(state='finished')]
+  def get_age(self):
+    return os.path.getmtime(self._detector.get_checkpoint(self._task_id))
 
-  def get_metadata(self, task_id, with_size=True):
-    runner_ckpt = self._detector.get_checkpoint(task_id)
-    process_ckpts = [ckpt for ckpt in self._detector.get_process_checkpoints(task_id)]
-    json_spec = TaskPath(root=self._root, task_id=task_id, state='finished').getpath('task_path')
+  def get_metadata(self, with_size=True):
+    runner_ckpt = self._detector.get_checkpoint(self._task_id)
+    process_ckpts = [ckpt for ckpt in self._detector.get_process_checkpoints(self._task_id)]
+    json_spec = TaskPath(
+        root=self._root, task_id=self._task_id, state='finished').getpath('task_path')
     for path in [json_spec, runner_ckpt] + process_ckpts:
       if with_size:
         yield path, safe_bsize(path)
       else:
         yield path
 
-  def get_logs(self, task_id, with_size=True):
-    state = self.state(task_id)
-    if state and state.header:
-      for path in self._detector.get_process_logs(task_id, state.header.log_dir):
+  def get_logs(self, with_size=True):
+    if self._state.header:
+      for path in self._detector.get_process_logs(self._task_id, self._state.header.log_dir):
         if with_size:
           yield path, safe_bsize(path)
         else:
           yield path
 
-  def get_data(self, task_id, with_size=True):
-    state = self.state(task_id)
-    if state and state.header and state.header.sandbox:
-      for root, dirs, files in os.walk(state.header.sandbox):
+  def get_data(self, with_size=True):
+    if self._state.header and self._state.header.sandbox:
+      for root, dirs, files in os.walk(self._state.header.sandbox):
         for file in files:
           filename = os.path.join(root, file)
           if with_size:
@@ -75,53 +71,40 @@ class TaskGarbageCollector(object):
           else:
             yield filename
 
-  def erase_task(self, task_id):
-    self.erase_data(task_id)
-    self.erase_logs(task_id)
-    self.erase_metadata(task_id)
+  def erase_task(self):
+    self.erase_data()
+    self.erase_logs()
+    self.erase_metadata()
 
-  def erase_metadata(self, task_id):
-    for fn in self.get_metadata(task_id, with_size=False):
+  def erase_metadata(self):
+    for fn in self.get_metadata(with_size=False):
       safe_delete(fn)
-    safe_rmtree(TaskPath(root=self._root, task_id=task_id).getpath('checkpoint_path'))
+    safe_rmtree(TaskPath(root=self._root, task_id=self._task_id).getpath('checkpoint_path'))
 
-  def erase_logs(self, task_id):
-    for fn in self.get_logs(task_id, with_size=False):
+  def erase_logs(self):
+    for fn in self.get_logs(with_size=False):
       safe_delete(fn)
-    state = self.state(task_id)
-    if state and state.header:
-      safe_rmtree(TaskPath(root=self._root, task_id=task_id, log_dir=state.header.log_dir)
-                  .getpath('process_logbase'))
-
-  def erase_data(self, task_id):
-    # TODO(wickman)
-    # This could be potentially dangerous if somebody naively runs their sandboxes in e.g.
-    # $HOME or / or similar.  Perhaps put a guard somewhere?
-    for fn in self.get_data(task_id, with_size=False):
-      os.unlink(fn)
-    state = self.state(task_id)
-    if state and state.header and state.header.sandbox:
-      safe_rmtree(state.header.sandbox)
-
-
-class TaskGarbageCollectionPolicy(Interface):
-  def __init__(self, collector):
-    if not isinstance(collector, TaskGarbageCollector):
-      raise ValueError(
-          "Expected collector to be a TaskGarbageCollector, got %s" % collector.__class__.__name__)
-    self._collector = collector
-
-  @property
-  def collector(self):
-    return self._collector
-
-  @abstractmethod
-  def run(self):
-    """Returns a list of task_ids that should be garbage collected given the specified policy."""
+    if self._state.header:
+      path = TaskPath(root=self._root, task_id=self._task_id, log_dir=self._state.header.log_dir)
+      safe_rmtree(path.getpath('process_logbase'))
 
-
-class DefaultCollector(TaskGarbageCollectionPolicy):
-  def __init__(self, collector, **kw):
+  def erase_data(self):
+    for fn in self.get_data(with_size=False):
+      os.unlink(fn)
+    if self._state.header and self._state.header.sandbox:
+      safe_rmtree(self._state.header.sandbox)
+
+
+class GarbageCollectionPolicy(object):
+  def __init__(self,
+               path_detector,
+               max_age=Amount(10 ** 10, Time.DAYS),
+               max_space=Amount(10 ** 10, Data.TB),
+               max_tasks=10 ** 10,
+               include_metadata=True,
+               include_logs=True,
+               verbose=False,
+               logger=sys.stdout.write):
     """
       Default garbage collection policy.
 
@@ -136,37 +119,53 @@ class DefaultCollector(TaskGarbageCollectionPolicy):
         verbose: boolean (whether or not to log)  [default: False]
         logger: callable (function to call with log messages) [default: sys.stdout.write]
     """
-    self._max_age = kw.get('max_age', Amount(10 ** 10, Time.DAYS))
-    self._max_space = kw.get('max_space', Amount(10 ** 10, Data.TB))
-    self._max_tasks = kw.get('max_tasks', 10 ** 10)
-    self._include_metadata = kw.get('include_metadata', True)
-    self._include_logs = kw.get('include_logs', True)
-    self._verbose = kw.get('verbose', False)
-    self._logger = kw.get('logger', sys.stdout.write)
-    TaskGarbageCollectionPolicy.__init__(self, collector)
+    self._path_detector = path_detector
+    self._max_age = max_age
+    self._max_space = max_space
+    self._max_tasks = max_tasks
+    self._include_metadata = include_metadata
+    self._include_logs = include_logs
+    self._verbose = verbose
+    self._logger = logger
 
   def log(self, msg):
     if self._verbose:
       self._logger(msg)
 
+  def get_finished_tasks(self):
+    """Yields (checkpoint_root, task_id) for finished tasks."""
+
+    for checkpoint_root in self._path_detector.get_paths():
+      for task_id in TaskDetector(checkpoint_root).get_task_ids(state='finished'):
+        yield (checkpoint_root, task_id)
+
   def run(self):
     tasks = []
     now = time.time()
 
-    TaskTuple = namedtuple('TaskTuple', 'task_id age metadata_size log_size data_size')
-    for task_id in self.collector.get_finished_tasks():
-      age = Amount(int(now - self.collector.get_age(task_id)), Time.SECONDS)
+    # age: The time (in seconds) since the last task transition to/from ACTIVE/FINISHED
+    # metadata_size: The size of the thermos checkpoint records for this task
+    # log_size: The size of the stdout/stderr logs for this task's processes
+    # data_size: The size of the sandbox of this task.
+    TaskTuple = namedtuple('TaskTuple',
+        'checkpoint_root task_id age metadata_size log_size data_size')
+
+    for checkpoint_root, task_id in self.get_finished_tasks():
+      collector = TaskGarbageCollector(checkpoint_root, task_id)
+
+      age = Amount(int(now - collector.get_age()), Time.SECONDS)
       self.log('Analyzing task %s (age: %s)... ' % (task_id, age))
-      metadata_size = Amount(sum(sz for _, sz in self.collector.get_metadata(task_id)), Data.BYTES)
+      metadata_size = Amount(sum(sz for _, sz in collector.get_metadata()), Data.BYTES)
       self.log('  metadata %.1fKB ' % metadata_size.as_(Data.KB))
-      log_size = Amount(sum(sz for _, sz in self.collector.get_logs(task_id)), Data.BYTES)
+      log_size = Amount(sum(sz for _, sz in collector.get_logs()), Data.BYTES)
       self.log('  logs %.1fKB ' % log_size.as_(Data.KB))
-      data_size = Amount(sum(sz for _, sz in self.collector.get_data(task_id)), Data.BYTES)
+      data_size = Amount(sum(sz for _, sz in collector.get_data()), Data.BYTES)
       self.log('  data %.1fMB ' % data_size.as_(Data.MB))
-      tasks.append(TaskTuple(task_id, age, metadata_size, log_size, data_size))
+      tasks.append(TaskTuple(checkpoint_root, task_id, age, metadata_size, log_size, data_size))
 
     gc_tasks = set()
     gc_tasks.update(task for task in tasks if task.age > self._max_age)
+
     self.log('After age filter: %s tasks' % len(gc_tasks))
 
     def total_gc_size(task):

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7de34614/src/test/python/apache/aurora/executor/bin/BUILD
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/bin/BUILD b/src/test/python/apache/aurora/executor/bin/BUILD
new file mode 100644
index 0000000..65a8b6c
--- /dev/null
+++ b/src/test/python/apache/aurora/executor/bin/BUILD
@@ -0,0 +1,23 @@
+python_test_suite(
+  name = 'all',
+  dependencies = [
+    ':gc_executor_entry_point',
+    ':thermos_executor_entry_point',
+  ]
+)
+
+python_tests(
+  name = 'gc_executor_entry_point',
+  sources = ['test_gc_executor_entry_point.py'],
+  dependencies = [
+    'src/main/python/apache/aurora/executor/bin:gc_executor_source',
+  ],
+)
+
+python_tests(
+  name = 'thermos_executor_entry_point',
+  sources = ['test_thermos_executor_entry_point.py'],
+  dependencies = [
+    'src/main/python/apache/aurora/executor/bin:thermos_executor_source',
+  ],
+)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7de34614/src/test/python/apache/aurora/executor/bin/test_gc_executor_entry_point.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/bin/test_gc_executor_entry_point.py b/src/test/python/apache/aurora/executor/bin/test_gc_executor_entry_point.py
new file mode 100644
index 0000000..4446e55
--- /dev/null
+++ b/src/test/python/apache/aurora/executor/bin/test_gc_executor_entry_point.py
@@ -0,0 +1,3 @@
+def test_gc_executor_valid_import_dependencies():
+  from apache.aurora.executor.bin.gc_executor_main import proxy_main
+  assert proxy_main is not None

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7de34614/src/test/python/apache/aurora/executor/bin/test_thermos_executor_entry_point.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/bin/test_thermos_executor_entry_point.py b/src/test/python/apache/aurora/executor/bin/test_thermos_executor_entry_point.py
new file mode 100644
index 0000000..841cef0
--- /dev/null
+++ b/src/test/python/apache/aurora/executor/bin/test_thermos_executor_entry_point.py
@@ -0,0 +1,3 @@
+def test_thermos_executor_valid_import_dependencies():
+  from apache.aurora.executor.bin.thermos_executor_main import proxy_main
+  assert proxy_main is not None

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7de34614/src/test/python/apache/aurora/executor/test_gc_executor.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/test_gc_executor.py b/src/test/python/apache/aurora/executor/test_gc_executor.py
index 3a0ff64..0f0b4a3 100644
--- a/src/test/python/apache/aurora/executor/test_gc_executor.py
+++ b/src/test/python/apache/aurora/executor/test_gc_executor.py
@@ -32,25 +32,34 @@ from twitter.common.dirutil import safe_rmtree
 from twitter.common.quantity import Amount, Time
 from twitter.common.testing.clock import ThreadedClock
 
-from apache.aurora.executor.gc_executor import ThermosGCExecutor
+from apache.aurora.executor.gc_executor import RootedTask, ThermosGCExecutor
 from apache.thermos.common.path import TaskPath
 from apache.thermos.config.schema import SimpleTask
 from apache.thermos.core.runner import TaskRunner
+from apache.thermos.monitoring.detector import FixedPathDetector
 
 from gen.apache.aurora.api.constants import LIVE_STATES, TERMINAL_STATES
 from gen.apache.aurora.api.ttypes import ScheduleStatus
 from gen.apache.aurora.comm.ttypes import AdjustRetainedTasks
 from gen.apache.thermos.ttypes import ProcessState, TaskState
 
-ACTIVE_TASKS = ('sleep60-lost',)
+FAKE_ROOT = 'fake_root'
+
+
+def make_task(task_id):
+  return RootedTask(FAKE_ROOT, task_id)
+
+
+ACTIVE_TASKS = (make_task('sleep60-lost'),)
+
 
 FINISHED_TASKS = {
-  'failure': ProcessState.SUCCESS,
-  'failure_limit': ProcessState.FAILED,
-  'hello_world': ProcessState.SUCCESS,
-  'ordering': ProcessState.SUCCESS,
-  'ports': ProcessState.SUCCESS,
-  'sleep60': ProcessState.KILLED
+  make_task('failure'): ProcessState.SUCCESS,
+  make_task('failure_limit'): ProcessState.FAILED,
+  make_task('hello_world'): ProcessState.SUCCESS,
+  make_task('ordering'): ProcessState.SUCCESS,
+  make_task('ports'): ProcessState.SUCCESS,
+  make_task('sleep60'): ProcessState.KILLED
 }
 
 # TODO(wickman) These should be constant sets in the Thermos thrift
@@ -138,21 +147,24 @@ class ThinTestThermosGCExecutor(ThermosGCExecutor):
     self._kills = set()
     self._losses = set()
     self._gcs = set()
-    ThermosGCExecutor.__init__(self, checkpoint_root, clock=ThreadedClock(time.time()),
+    ThermosGCExecutor.__init__(
+        self,
+        FixedPathDetector(checkpoint_root),
+        clock=ThreadedClock(time.time()),
         executor_detector=lambda: list)
 
   @property
   def gcs(self):
     return self._gcs
 
-  def _gc(self, task_id):
-    self._gcs.add(task_id)
+  def _gc(self, task):
+    self._gcs.add(task)
 
-  def _terminate_task(self, task_id, kill=True):
+  def _terminate_task(self, task, kill=True):
     if kill:
-      self._kills.add(task_id)
+      self._kills.add(task)
     else:
-      self._losses.add(task_id)
+      self._losses.add(task)
     return True
 
   @property
@@ -166,7 +178,7 @@ class ThickTestThermosGCExecutor(ThinTestThermosGCExecutor):
     self._finished_tasks = finished_tasks
     self._corrupt_tasks = corrupt_tasks
     self._maybe_terminate = set()
-    ThinTestThermosGCExecutor.__init__(self, None, active_executors)
+    ThinTestThermosGCExecutor.__init__(self, FAKE_ROOT, active_executors)
 
   @property
   def results(self):
@@ -179,20 +191,20 @@ class ThickTestThermosGCExecutor(ThinTestThermosGCExecutor):
   def partition_tasks(self):
     return set(self._active_tasks.keys()), set(self._finished_tasks.keys())
 
-  def maybe_terminate_unknown_task(self, task_id):
-    self._maybe_terminate.add(task_id)
+  def maybe_terminate_unknown_task(self, task):
+    self._maybe_terminate.add(task)
 
-  def get_states(self, task_id):
-    if task_id not in self._corrupt_tasks:
-      if task_id in self._active_tasks:
-        return [(self._clock.time(), self._active_tasks[task_id])]
-      elif task_id in self._finished_tasks:
-        return [(self._clock.time(), self._finished_tasks[task_id])]
+  def get_states(self, task):
+    if task not in self._corrupt_tasks:
+      if task in self._active_tasks:
+        return [(self._clock.time(), self._active_tasks[task])]
+      elif task in self._finished_tasks:
+        return [(self._clock.time(), self._finished_tasks[task])]
     return []
 
-  def should_gc_task(self, task_id):
-    if task_id in self._corrupt_tasks:
-      return set([task_id])
+  def should_gc_task(self, task):
+    if task in self._corrupt_tasks:
+      return set([task])
     return set()
 
 
@@ -207,21 +219,21 @@ def llen(*iterables):
 def test_state_reconciliation_no_ops():
   # active vs. active
   for st0, st1 in product(THERMOS_LIVES, LIVE_STATES):
-    tgc, driver = make_pair({'foo': st0}, {})
+    tgc, driver = make_pair({make_task('foo'): st0}, {})
     lgc, rgc, updates = tgc.reconcile_states(driver, {'foo': st1})
     assert tgc.len_results == (0, 0, 0, 0)
     assert llen(lgc, rgc, updates) == (0, 0, 0)
 
   # terminal vs. terminal
   for st0, st1 in product(THERMOS_TERMINALS, TERMINAL_STATES):
-    tgc, driver = make_pair({}, {'foo': st0})
+    tgc, driver = make_pair({}, {make_task('foo'): st0})
     lgc, rgc, updates = tgc.reconcile_states(driver, {'foo': st1})
     assert tgc.len_results == (0, 0, 0, 0)
     assert llen(lgc, rgc, updates) == (0, 0, 0)
 
   # active vs. starting
   for st0, st1 in product(THERMOS_LIVES, STARTING_STATES):
-    tgc, driver = make_pair({'foo': st0}, {})
+    tgc, driver = make_pair({make_task('foo'): st0}, {})
     lgc, rgc, updates = tgc.reconcile_states(driver, {'foo': st1})
     assert tgc.len_results == (0, 0, 0, 0)
     assert llen(lgc, rgc, updates) == (0, 0, 0)
@@ -236,7 +248,7 @@ def test_state_reconciliation_no_ops():
 
 def test_state_reconciliation_active_terminal():
   for st0, st1 in product(THERMOS_LIVES, TERMINAL_STATES):
-    tgc, driver = make_pair({'foo': st0}, {})
+    tgc, driver = make_pair({make_task('foo'): st0}, {})
     lgc, rgc, updates = tgc.reconcile_states(driver, {'foo': st1})
     assert tgc.len_results == (0, 0, 0, 1)
     assert llen(lgc, rgc, updates) == (0, 0, 0)
@@ -244,7 +256,7 @@ def test_state_reconciliation_active_terminal():
 
 def test_state_reconciliation_active_nexist():
   for st0 in THERMOS_LIVES:
-    tgc, driver = make_pair({'foo': st0}, {})
+    tgc, driver = make_pair({make_task('foo'): st0}, {})
     lgc, rgc, updates = tgc.reconcile_states(driver, {})
     assert tgc.len_results == (0, 0, 0, 1)
     assert llen(lgc, rgc, updates) == (0, 0, 0)
@@ -252,7 +264,7 @@ def test_state_reconciliation_active_nexist():
 
 def test_state_reconciliation_terminal_active():
   for st0, st1 in product(THERMOS_TERMINALS, LIVE_STATES):
-    tgc, driver = make_pair({}, {'foo': st0})
+    tgc, driver = make_pair({}, {make_task('foo'): st0})
     lgc, rgc, updates = tgc.reconcile_states(driver, {'foo': st1})
     assert tgc.len_results == (0, 0, 0, 0)
     assert llen(lgc, rgc, updates) == (0, 0, 1)
@@ -260,7 +272,7 @@ def test_state_reconciliation_terminal_active():
 
 def test_state_reconciliation_corrupt_tasks():
   for st0, st1 in product(THERMOS_TERMINALS, LIVE_STATES):
-    tgc, driver = make_pair({}, {'foo': st0}, corrupt_tasks=['foo'])
+    tgc, driver = make_pair({}, {make_task('foo'): st0}, corrupt_tasks=[make_task('foo')])
     lgc, rgc, updates = tgc.reconcile_states(driver, {'foo': st1})
     assert tgc.len_results == (0, 0, 0, 0)
     assert llen(lgc, rgc, updates) == (1, 0, 0)
@@ -268,11 +280,11 @@ def test_state_reconciliation_corrupt_tasks():
 
 def test_state_reconciliation_terminal_nexist():
   for st0, st1 in product(THERMOS_TERMINALS, LIVE_STATES):
-    tgc, driver = make_pair({}, {'foo': st0})
+    tgc, driver = make_pair({}, {make_task('foo'): st0})
     lgc, rgc, updates = tgc.reconcile_states(driver, {})
     assert tgc.len_results == (0, 0, 0, 0)
     assert llen(lgc, rgc, updates) == (1, 0, 0)
-    assert lgc == set(['foo'])
+    assert lgc == set([make_task('foo')])
 
 
 def test_state_reconciliation_nexist_active():
@@ -297,9 +309,10 @@ def test_real_get_states():
     setup_tree(td)
     executor = ThinTestThermosGCExecutor(td)
     for task in FINISHED_TASKS:
-      states = executor.get_states(task)
+      real_task = RootedTask(td, task.task_id)
+      states = executor.get_states(real_task)
       assert isinstance(states, list) and len(states) > 0
-      assert executor.get_sandbox(task) is not None
+      assert executor.get_sandbox(real_task) is not None
 
 
 def wait_until_not(thing, timeout=EVENT_WAIT_TIMEOUT_SECS):
@@ -339,46 +352,54 @@ def run_gc_with(active_executors, retained_tasks, lose=False):
 
 
 def test_gc_with_loss():
-  executor, proxy_driver = run_gc_with(active_executors=set(ACTIVE_TASKS), retained_tasks={},
+  executor, proxy_driver = run_gc_with(
+      active_executors=set(task.task_id for task in ACTIVE_TASKS),
+      retained_tasks={},
       lose=True)
   assert len(executor._kills) == len(ACTIVE_TASKS)
   assert len(executor.gcs) == len(FINISHED_TASKS)
   assert len(proxy_driver.updates) >= 1
-  assert StatusUpdate(mesos_pb2.TASK_LOST, ACTIVE_TASKS[0]) in proxy_driver.updates
+  assert StatusUpdate(mesos_pb2.TASK_LOST, ACTIVE_TASKS[0].task_id) in proxy_driver.updates
 
 
 def test_gc_with_starting_task():
   executor, proxy_driver = run_gc_with(
-    active_executors=set(ACTIVE_TASKS), retained_tasks={ACTIVE_TASKS[0]: ScheduleStatus.STARTING})
+    active_executors=set(task.task_id for task in ACTIVE_TASKS),
+    retained_tasks={ACTIVE_TASKS[0].task_id: ScheduleStatus.STARTING})
   assert len(executor._kills) == 0
   assert len(executor.gcs) == len(FINISHED_TASKS)
 
 
 def test_gc_without_task_missing():
-  executor, proxy_driver = run_gc_with(active_executors=set(ACTIVE_TASKS), retained_tasks={},
+  executor, proxy_driver = run_gc_with(
+      active_executors=set(task.task_id for task in ACTIVE_TASKS),
+      retained_tasks={},
       lose=False)
   assert len(executor._kills) == len(ACTIVE_TASKS)
   assert len(executor.gcs) == len(FINISHED_TASKS)
 
 
 def test_gc_without_loss():
-  executor, proxy_driver = run_gc_with(active_executors=set(ACTIVE_TASKS),
-      retained_tasks={ACTIVE_TASKS[0]: ScheduleStatus.RUNNING})
+  executor, proxy_driver = run_gc_with(
+      active_executors=set(task.task_id for task in ACTIVE_TASKS),
+      retained_tasks={ACTIVE_TASKS[0].task_id: ScheduleStatus.RUNNING})
   assert len(executor._kills) == 0
   assert len(executor.gcs) == len(FINISHED_TASKS)
 
 
 def test_gc_withheld():
-  executor, proxy_driver = run_gc_with(active_executors=set([ACTIVE_TASKS[0], 'failure']),
-      retained_tasks={ACTIVE_TASKS[0]: ScheduleStatus.RUNNING,
+  executor, proxy_driver = run_gc_with(
+      active_executors=set([ACTIVE_TASKS[0].task_id, 'failure']),
+      retained_tasks={ACTIVE_TASKS[0].task_id: ScheduleStatus.RUNNING,
                       'failure': ScheduleStatus.FAILED})
   assert len(executor._kills) == 0
   assert len(executor.gcs) == len(FINISHED_TASKS) - 1
 
 
 def test_gc_withheld_and_executor_missing():
-  executor, proxy_driver = run_gc_with(active_executors=set(ACTIVE_TASKS),
-      retained_tasks={ACTIVE_TASKS[0]: ScheduleStatus.RUNNING,
+  executor, proxy_driver = run_gc_with(
+      active_executors=set(task.task_id for task in ACTIVE_TASKS),
+      retained_tasks={ACTIVE_TASKS[0].task_id: ScheduleStatus.RUNNING,
                       'failure': ScheduleStatus.FAILED})
   assert len(executor._kills) == 0
   assert len(executor.gcs) == len(FINISHED_TASKS)
@@ -573,13 +594,13 @@ class TestRealGC(unittest.TestCase):
         pass
 
     class FakeTaskGarbageCollector(object):
-      def __init__(self, root):
+      def __init__(self, root, task_id):
         pass
 
-      def erase_logs(self, task_id):
+      def erase_logs(self):
         pass
 
-      def erase_metadata(self, task_id):
+      def erase_metadata(self):
         pass
 
     class FastThermosGCExecutor(ThermosGCExecutor):
@@ -587,7 +608,7 @@ class TestRealGC(unittest.TestCase):
 
     detector = functools.partial(FakeExecutorDetector, task_id) if retain else FakeExecutorDetector
     executor = FastThermosGCExecutor(
-        checkpoint_root=root,
+        path_detector=FixedPathDetector(root),
         task_killer=FakeTaskKiller,
         executor_detector=detector,
         task_garbage_collector=FakeTaskGarbageCollector,


Mime
View raw message