aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wick...@apache.org
Subject incubator-aurora git commit: Port thermos observer to the path detector interface
Date Tue, 03 Mar 2015 23:30:47 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 2bf03dc5e -> c0b37015a


Port thermos observer to the path detector interface

This creates a new abstraction, the ObserverTaskDetector, which is
responsible for managing state transitions for tasks for the observer.  Adds
some tests and better debug logging.

Testing Done:
Manually launched observer and ran some thermos tasks.
+
mba=aurora=; ./pants test src/test/python/apache/thermos/observer/::

Bugs closed: AURORA-1026

Reviewed at https://reviews.apache.org/r/31451/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/c0b37015
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/c0b37015
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/c0b37015

Branch: refs/heads/master
Commit: c0b37015a9c7d37f561ad6c445ed7d4d3e218e89
Parents: 2bf03dc
Author: Brian Wickman <wickman@apache.org>
Authored: Tue Mar 3 15:30:37 2015 -0800
Committer: Brian Wickman <wickman@apache.org>
Committed: Tue Mar 3 15:30:37 2015 -0800

----------------------------------------------------------------------
 .../python/apache/aurora/executor/common/BUILD  |   1 -
 .../aurora/executor/common/resource_manager.py  |  19 +--
 .../aurora/executor/thermos_task_runner.py      |   3 +-
 src/main/python/apache/thermos/bin/thermos.py   |   5 +-
 src/main/python/apache/thermos/common/path.py   |   3 +
 .../python/apache/thermos/monitoring/disk.py    |   5 +-
 .../python/apache/thermos/monitoring/monitor.py |  28 ++--
 .../apache/thermos/monitoring/resource.py       |  45 +++---
 src/main/python/apache/thermos/observer/BUILD   |  12 +-
 .../python/apache/thermos/observer/bin/BUILD    |   3 +-
 .../thermos/observer/bin/thermos_observer.py    |   3 +-
 .../python/apache/thermos/observer/detector.py  | 102 ++++++++++++
 .../thermos/observer/http/file_browser.py       |   2 +
 .../apache/thermos/observer/observed_task.py    |  30 ++--
 .../apache/thermos/observer/task_observer.py    | 113 ++++++-------
 src/test/python/apache/aurora/executor/BUILD    |   1 -
 .../python/apache/aurora/executor/common/BUILD  |   1 +
 .../common/test_resource_manager_integration.py |  30 ++--
 .../aurora/executor/test_thermos_executor.py    |   5 +-
 .../python/apache/thermos/bin/test_thermos.py   |   5 +-
 .../apache/thermos/core/test_staged_kill.py     |  12 +-
 .../apache/thermos/monitoring/test_resource.py  |   9 +-
 src/test/python/apache/thermos/observer/BUILD   |  30 ++++
 .../apache/thermos/observer/test_detector.py    | 161 +++++++++++++++++++
 24 files changed, 467 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c0b37015/src/main/python/apache/aurora/executor/common/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/BUILD b/src/main/python/apache/aurora/executor/common/BUILD
index 8cef41d..3d94792 100644
--- a/src/main/python/apache/aurora/executor/common/BUILD
+++ b/src/main/python/apache/aurora/executor/common/BUILD
@@ -134,7 +134,6 @@ python_library(
     '3rdparty/python:twitter.common.dirutil',
     '3rdparty/python:twitter.common.exceptions',
     '3rdparty/python:twitter.common.log',
-    '3rdparty/python:twitter.common.quantity',
     'api/src/main/thrift/org/apache/aurora/gen:py-thrift',
     'src/main/python/apache/aurora/executor/common:status_checker',
     'src/main/python/apache/aurora/executor/common:task_info',

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c0b37015/src/main/python/apache/aurora/executor/common/resource_manager.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/resource_manager.py b/src/main/python/apache/aurora/executor/common/resource_manager.py
index 08e02e4..b7dc40d 100644
--- a/src/main/python/apache/aurora/executor/common/resource_manager.py
+++ b/src/main/python/apache/aurora/executor/common/resource_manager.py
@@ -16,7 +16,6 @@ import threading
 
 from mesos.interface import mesos_pb2
 from twitter.common.metrics import LambdaGauge
-from twitter.common.quantity import Amount, Time
 
 from apache.aurora.executor.common.status_checker import (
     StatusChecker,
@@ -24,8 +23,6 @@ from apache.aurora.executor.common.status_checker import (
     StatusResult
 )
 from apache.aurora.executor.common.task_info import mesos_task_instance_from_assigned_task
-from apache.thermos.common.path import TaskPath
-from apache.thermos.monitoring.disk import DiskCollector
 from apache.thermos.monitoring.monitor import TaskMonitor
 from apache.thermos.monitoring.resource import TaskResourceMonitor
 
@@ -93,22 +90,16 @@ class ResourceManager(StatusChecker):
 
 
 class ResourceManagerProvider(StatusCheckerProvider):
-  def __init__(self,
-               checkpoint_root,
-               disk_collector=DiskCollector,
-               disk_collection_interval=Amount(1, Time.MINUTES)):
+  def __init__(self, checkpoint_root, **resource_monitor_options):
     self._checkpoint_root = checkpoint_root
-    self._disk_collector = disk_collector
-    self._disk_collection_interval = disk_collection_interval
+    self._resource_monitor_options = resource_monitor_options
 
   def from_assigned_task(self, assigned_task, sandbox):
     task_id = assigned_task.taskId
     resources = mesos_task_instance_from_assigned_task(assigned_task).task().resources()
-    task_path = TaskPath(root=self._checkpoint_root, task_id=task_id)
-    task_monitor = TaskMonitor(task_path, task_id)
+    task_monitor = TaskMonitor(self._checkpoint_root, task_id)
     resource_monitor = TaskResourceMonitor(
+        task_id,
         task_monitor,
-        sandbox.root,
-        disk_collector=self._disk_collector,
-        disk_collection_interval=self._disk_collection_interval)
+        **self._resource_monitor_options)
     return ResourceManager(resources, resource_monitor)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c0b37015/src/main/python/apache/aurora/executor/thermos_task_runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/thermos_task_runner.py b/src/main/python/apache/aurora/executor/thermos_task_runner.py
index 7a28e32..505a1e6 100644
--- a/src/main/python/apache/aurora/executor/thermos_task_runner.py
+++ b/src/main/python/apache/aurora/executor/thermos_task_runner.py
@@ -31,7 +31,6 @@ from twitter.common.quantity import Amount, Time
 
 from apache.aurora.common.http_signaler import HttpSignaler
 from apache.thermos.common.constants import DEFAULT_CHECKPOINT_ROOT
-from apache.thermos.common.path import TaskPath
 from apache.thermos.common.statuses import (
     INTERNAL_ERROR,
     INVALID_TASK,
@@ -271,7 +270,7 @@ class ThermosTaskRunner(TaskRunner):
     """Fork the task runner and return once the underlying task is running, up to timeout."""
     self.forking.set()
 
-    self._monitor = TaskMonitor(TaskPath(root=self._checkpoint_root), self._task_id)
+    self._monitor = TaskMonitor(self._checkpoint_root, self._task_id)
 
     cmdline_args = self._cmdline()
     log.info('Forking off runner with cmdline: %s' % ' '.join(cmdline_args))

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c0b37015/src/main/python/apache/thermos/bin/thermos.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/bin/thermos.py b/src/main/python/apache/thermos/bin/thermos.py
index 0853a98..b8a6388 100644
--- a/src/main/python/apache/thermos/bin/thermos.py
+++ b/src/main/python/apache/thermos/bin/thermos.py
@@ -22,6 +22,7 @@ import pprint
 import pwd
 import re
 import sys
+import tempfile
 import time
 
 from pystachio.naming import frozendict
@@ -285,7 +286,7 @@ def simplerun(args, options):
 
   _really_run(thermos_task,
               options.root,
-              None,
+              tempfile.mkdtemp(),
               task_id=options.task_id,
               user=options.user,
               prebound_ports=options.prebound_ports,
@@ -594,7 +595,7 @@ def tail(args, options):
      run=run, log_dir=log_dir).getpath('process_logdir')
   logfile = os.path.join(logdir, 'stderr' if options.use_stderr else 'stdout')
 
-  monitor = TaskMonitor(TaskPath(root=options.root), args[0])
+  monitor = TaskMonitor(options.root, args[0])
   def log_is_active():
     active_processes = monitor.get_active_processes()
     for process_status, process_run in active_processes:

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c0b37015/src/main/python/apache/thermos/common/path.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/common/path.py b/src/main/python/apache/thermos/common/path.py
index 846f507..4359a51 100644
--- a/src/main/python/apache/thermos/common/path.py
+++ b/src/main/python/apache/thermos/common/path.py
@@ -83,6 +83,9 @@ class TaskPath(object):
     self._data = dict((key, '%%(%s)s' % key) for key in keys)
     self._data.update(kw)
 
+  def __hash__(self):
+    return hash(tuple(self._data.items()))
+
   def given(self, **kw):
     """ Perform further interpolation of the templates given the kwargs """
     eval_dict = dict(self._data)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c0b37015/src/main/python/apache/thermos/monitoring/disk.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/disk.py b/src/main/python/apache/thermos/monitoring/disk.py
index 175ed3a..ccf6d6a 100644
--- a/src/main/python/apache/thermos/monitoring/disk.py
+++ b/src/main/python/apache/thermos/monitoring/disk.py
@@ -56,9 +56,10 @@ class DiskCollectorThread(ExceptionalThread):
     self.daemon = True
 
   def run(self):
-    log.debug("DiskCollectorThread: starting collection of %s" % self.path)
+    start = time.time()
     self.value = du(self.path)
-    log.debug("DiskCollectorThread: finished collection of %s" % self.path)
+    log.debug("DiskCollectorThread: finished collection of %s in %.1fms" % (
+        self.path, 1000.0 * (time.time() - start)))
     self.event.set()
 
   def finished(self):

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c0b37015/src/main/python/apache/thermos/monitoring/monitor.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/monitor.py b/src/main/python/apache/thermos/monitoring/monitor.py
index 11423bc..d77703e 100644
--- a/src/main/python/apache/thermos/monitoring/monitor.py
+++ b/src/main/python/apache/thermos/monitoring/monitor.py
@@ -31,6 +31,7 @@ from twitter.common import log
 from twitter.common.recordio import ThriftRecordReader
 
 from apache.thermos.common.ckpt import CheckpointDispatcher
+from apache.thermos.common.path import TaskPath
 
 from gen.apache.thermos.ttypes import ProcessState, RunnerCkpt, RunnerState, TaskState
 
@@ -41,16 +42,17 @@ class TaskMonitor(object):
     its runner checkpoint. Also exports information on active processes in the task.
   """
 
-  def __init__(self, pathspec, task_id):
-    """
-    :type pathspec: apache.thermos.common.path.TaskPath
+  def __init__(self, root, task_id):
+    """Construct a TaskMonitor.
+
+    :param root: The checkpoint root of the task.
+    :param task_id: The task id of the task.
     """
-    self._task_id = task_id
+    pathspec = TaskPath(root=root, task_id=task_id)
     self._dispatcher = CheckpointDispatcher()
     self._runnerstate = RunnerState(processes={})
-    self._runner_ckpt = pathspec.given(task_id=task_id).getpath('runner_checkpoint')
-    self._active_file, self._finished_file = (
-        pathspec.given(task_id=task_id, state=state).getpath('task_path')
+    self._runner_ckpt = pathspec.getpath('runner_checkpoint')
+    self._active_file, self._finished_file = (pathspec.given(state=state).getpath('task_path')
         for state in ('active', 'finished'))
     self._ckpt_head = 0
     self._apply_states()
@@ -87,7 +89,7 @@ class TaskMonitor(object):
     except OSError as e:
       if e.errno == errno.ENOENT:
         # The log doesn't yet exist, will retry later.
-        log.warning('Could not read from discovered task %s.' % self._task_id)
+        log.warning('Could not read from checkpoint %s' % self._runner_ckpt)
         return False
       else:
         raise
@@ -100,10 +102,14 @@ class TaskMonitor(object):
     with self._lock:
       return self._apply_states()
 
+  def get_sandbox(self):
+    """Get the sandbox of this task, or None if it has not yet been discovered."""
+    state = self.get_state()
+    if state.header:
+      return state.header.sandbox
+
   def get_state(self):
-    """
-      Get the latest state of this Task.
-    """
+    """Get the latest state of this Task."""
     with self._lock:
       self._apply_states()
       return copy.deepcopy(self._runnerstate)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c0b37015/src/main/python/apache/thermos/monitoring/resource.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/resource.py b/src/main/python/apache/thermos/monitoring/resource.py
index b4cb881..c1d7804 100644
--- a/src/main/python/apache/thermos/monitoring/resource.py
+++ b/src/main/python/apache/thermos/monitoring/resource.py
@@ -37,6 +37,7 @@ from operator import attrgetter
 from twitter.common import log
 from twitter.common.collections import RingBuffer
 from twitter.common.concurrent import EventMuxer
+from twitter.common.exceptions import ExceptionalThread
 from twitter.common.lang import Interface
 from twitter.common.quantity import Amount, Time
 
@@ -109,7 +110,7 @@ class ResourceHistory(object):
     return 'ResourceHistory(%s)' % ', '.join([str(r) for r in self._values])
 
 
-class TaskResourceMonitor(ResourceMonitorBase, threading.Thread):
+class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread):
   """ Lightweight thread to aggregate resource consumption for a task's constituent processes.
       Actual resource calculation is delegated to collectors; this class periodically polls the
       collectors and aggregates into a representation for the entire task. Also maintains a limited
@@ -119,8 +120,8 @@ class TaskResourceMonitor(ResourceMonitorBase, threading.Thread):
   MAX_HISTORY = 10000  # magic number
 
   def __init__(self,
+               task_id,
                task_monitor,
-               sandbox,
                process_collector=ProcessTreeCollector,
                disk_collector=DiskCollector,
                process_collection_interval=Amount(20, Time.SECONDS),
@@ -131,14 +132,12 @@ class TaskResourceMonitor(ResourceMonitorBase, threading.Thread):
       sandbox: Directory for which to monitor disk utilisation
     """
     self._task_monitor = task_monitor  # exposes PIDs, sandbox
-    self._task_id = task_monitor._task_id
+    self._task_id = task_id
     log.debug('Initialising resource collection for task %s' % self._task_id)
     self._process_collectors = dict()  # ProcessStatus => ProcessTreeCollector
-    # TODO(jon): sandbox is also available through task_monitor, but typically the first checkpoint
-    # isn't written (and hence the header is not available) by the time we initialise here
-    self._sandbox = sandbox
     self._process_collector_factory = process_collector
-    self._disk_collector = disk_collector(self._sandbox)
+    self._disk_collector_class = disk_collector
+    self._disk_collector = None
     self._process_collection_interval = process_collection_interval.as_(Time.SECONDS)
     self._disk_collection_interval = disk_collection_interval.as_(Time.SECONDS)
     min_collection_interval = min(self._process_collection_interval, self._disk_collection_interval)
@@ -148,7 +147,7 @@ class TaskResourceMonitor(ResourceMonitorBase, threading.Thread):
     log.debug("Initialising ResourceHistory of length %s" % history_length)
     self._history = ResourceHistory(history_length)
     self._kill_signal = threading.Event()
-    threading.Thread.__init__(self)
+    ExceptionalThread.__init__(self)
     self.daemon = True
 
   def sample(self):
@@ -198,30 +197,29 @@ class TaskResourceMonitor(ResourceMonitorBase, threading.Thread):
         actives = set(self._get_active_processes())
         current = set(self._process_collectors)
         for process in current - actives:
-          log.debug('Process "%s" (pid %s) no longer active, removing from monitored processes' %
-                   (process.process, process.pid))
           self._process_collectors.pop(process)
         for process in actives - current:
-          log.debug('Adding process "%s" (pid %s) to resource monitoring' %
-                   (process.process, process.pid))
           self._process_collectors[process] = self._process_collector_factory(process.pid)
         for process, collector in self._process_collectors.items():
-          log.debug('Collecting sample for process "%s" (pid %s) and children' %
-                   (process.process, process.pid))
           collector.sample()
 
       if now > next_disk_collection:
         next_disk_collection = now + self._disk_collection_interval
-        log.debug('Collecting disk sample for %s' % self._sandbox)
-        self._disk_collector.sample()
+        if not self._disk_collector:
+          sandbox = self._task_monitor.get_sandbox()
+          if sandbox:
+            self._disk_collector = self._disk_collector_class(sandbox)
+        if self._disk_collector:
+          self._disk_collector.sample()
+        else:
+          log.debug('No sandbox detected yet for %s' % self._task_id)
 
       try:
         aggregated_procs = sum(map(attrgetter('procs'), self._process_collectors.values()))
         aggregated_sample = sum(map(attrgetter('value'), self._process_collectors.values()),
                                 ProcessSample.empty())
-        self._history.add(now, self.ResourceResult(aggregated_procs, aggregated_sample,
-                                                   self._disk_collector.value))
-        log.debug("Recorded resource sample at %s" % now)
+        disk_value = self._disk_collector.value if self._disk_collector else 0
+        self._history.add(now, self.ResourceResult(aggregated_procs, aggregated_sample, disk_value))
       except ValueError as err:
         log.warning("Error recording resource sample: %s" % err)
 
@@ -232,7 +230,12 @@ class TaskResourceMonitor(ResourceMonitorBase, threading.Thread):
       # - the TaskResourceMonitor has been killed via self._kill_signal
       now = time.time()
       next_collection = min(next_process_collection - now, next_disk_collection - now)
-      EventMuxer(self._kill_signal, self._disk_collector.completed_event
-                ).wait(timeout=max(0, next_collection))
+
+      if self._disk_collector:
+        waiter = EventMuxer(self._kill_signal, self._disk_collector.completed_event)
+      else:
+        waiter = self._kill_signal
+
+      waiter.wait(timeout=max(0, next_collection))
 
     log.debug('Stopping resource monitoring for task "%s"' % self._task_id)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c0b37015/src/main/python/apache/thermos/observer/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/BUILD b/src/main/python/apache/thermos/observer/BUILD
index ee65f3a..28995b9 100644
--- a/src/main/python/apache/thermos/observer/BUILD
+++ b/src/main/python/apache/thermos/observer/BUILD
@@ -21,22 +21,32 @@ python_library(
     '3rdparty/python:pystachio',
     '3rdparty/python:twitter.common.lang',
     '3rdparty/python:twitter.common.log',
+    'api/src/main/thrift/org/apache/thermos:py-thrift',
     'src/main/python/apache/thermos/common:ckpt',
+    'src/main/python/apache/thermos/common:path',
     'src/main/python/apache/thermos/config',
   ]
 )
 
 python_library(
+  name = 'detector',
+  sources = ['detector.py'],
+  dependencies = [
+    'src/main/python/apache/thermos/monitoring:detector',
+  ]
+)
+
+python_library(
   name = 'task_observer',
   sources = ['task_observer.py'],
   dependencies = [
+    ':detector',
     ':observed_task',
     '3rdparty/python:twitter.common.exceptions',
     '3rdparty/python:twitter.common.lang',
     '3rdparty/python:twitter.common.log',
     '3rdparty/python:twitter.common.quantity',
     'src/main/python/apache/thermos/common:path',
-    'src/main/python/apache/thermos/monitoring:detector',
     'src/main/python/apache/thermos/monitoring:monitor',
     'src/main/python/apache/thermos/monitoring:process',
     'src/main/python/apache/thermos/monitoring:resource',

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c0b37015/src/main/python/apache/thermos/observer/bin/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/bin/BUILD b/src/main/python/apache/thermos/observer/bin/BUILD
index 15a03f7..a42dbf3 100644
--- a/src/main/python/apache/thermos/observer/bin/BUILD
+++ b/src/main/python/apache/thermos/observer/bin/BUILD
@@ -21,7 +21,8 @@ python_binary(
     '3rdparty/python:twitter.common.app',
     '3rdparty/python:twitter.common.exceptions',
     '3rdparty/python:twitter.common.http',
-    'src/main/python/apache/thermos/common:path',
+    'src/main/python/apache/thermos/common:constants',
+    'src/main/python/apache/thermos/monitoring:detector',
     'src/main/python/apache/thermos/observer/http:http_observer',
     'src/main/python/apache/thermos/observer:task_observer',
   ],

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c0b37015/src/main/python/apache/thermos/observer/bin/thermos_observer.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/bin/thermos_observer.py b/src/main/python/apache/thermos/observer/bin/thermos_observer.py
index effa8c1..213a48e 100644
--- a/src/main/python/apache/thermos/observer/bin/thermos_observer.py
+++ b/src/main/python/apache/thermos/observer/bin/thermos_observer.py
@@ -22,6 +22,7 @@ from twitter.common.http import HttpServer
 from twitter.common.http.diagnostics import DiagnosticsEndpoints
 
 from apache.thermos.common.constants import DEFAULT_CHECKPOINT_ROOT
+from apache.thermos.monitoring.detector import FixedPathDetector
 from apache.thermos.observer.http.http_observer import BottleObserver
 from apache.thermos.observer.task_observer import TaskObserver
 
@@ -49,7 +50,7 @@ def proxy_main():
     root_server = HttpServer()
     root_server.mount_routes(DiagnosticsEndpoints())
 
-    task_observer = TaskObserver(opts.root)
+    task_observer = TaskObserver(FixedPathDetector(opts.root))
     task_observer.start()
 
     bottle_wrapper = BottleObserver(task_observer)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c0b37015/src/main/python/apache/thermos/observer/detector.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/detector.py b/src/main/python/apache/thermos/observer/detector.py
new file mode 100644
index 0000000..5347cde
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/detector.py
@@ -0,0 +1,102 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+from collections import namedtuple
+
+from apache.thermos.monitoring.detector import PathDetector, TaskDetector
+
+RootedTask = namedtuple('RootedTask', ('root task_id'))
+
+
+class ObserverTaskDetector(object):
+  """ObserverTaskDetector turns on-disk thermos task transitions into callback events."""
+
+  @classmethod
+  def maybe_callback(cls, callback):
+    if callback is not None:
+      return callback
+    return lambda: True
+
+  def __init__(self,
+               path_detector,
+               on_active=None,
+               on_finished=None,
+               on_removed=None):
+
+    if not isinstance(path_detector, PathDetector):
+      raise TypeError('ObserverTaskDetector takes PathDetector, got %s' % type(path_detector))
+
+    self._path_detector = path_detector
+    self._active_tasks = set()  # (root, task_id) tuple
+    self._finished_tasks = set()  # (root, task_id) tuple
+    self._on_active = self.maybe_callback(on_active)
+    self._on_finished = self.maybe_callback(on_finished)
+    self._on_removed = self.maybe_callback(on_removed)
+
+  @property
+  def active_tasks(self):
+    return self._active_tasks.copy()
+
+  @property
+  def finished_tasks(self):
+    return self._finished_tasks.copy()
+
+  def iter_tasks(self):
+    # returns an iterator of root, task_id, active/finished
+    for root in self._path_detector.get_paths():
+      for status, task_id in TaskDetector(root=root).get_task_ids():
+        yield (root, task_id, status)
+
+  def refresh(self):
+    all_active, all_finished = set(), set()
+
+    for root, task_id, status in self.iter_tasks():
+      task = RootedTask(root, task_id)
+
+      if status == 'active':
+        all_active.add(task)
+        if task in self._active_tasks:
+          continue
+        elif task in self._finished_tasks:
+          assert False, 'Unexpected state.'
+        else:
+          self._active_tasks.add(task)
+          self._on_active(root, task_id)
+      elif status == 'finished':
+        all_finished.add(task)
+        if task in self._active_tasks:
+          self._active_tasks.remove(task)
+          self._finished_tasks.add(task)
+          self._on_finished(root, task_id)
+        elif task in self._finished_tasks:
+          continue
+        else:
+          self._finished_tasks.add(task)
+          self._on_active(root, task_id)
+          self._on_finished(root, task_id)
+      else:
+        assert False, 'Unknown state.'
+
+    all_tasks = all_active | all_finished
+
+    for task in self.active_tasks - all_tasks:
+      self._on_finished(task.root, task.task_id)
+      self._on_removed(task.root, task.task_id)
+
+    for task in self.finished_tasks - all_tasks:
+      self._on_removed(task.root, task.task_id)
+
+    self._active_tasks = all_active
+    self._finished_tasks = all_finished

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c0b37015/src/main/python/apache/thermos/observer/http/file_browser.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/file_browser.py b/src/main/python/apache/thermos/observer/http/file_browser.py
index 87ef9c8..2f48594 100644
--- a/src/main/python/apache/thermos/observer/http/file_browser.py
+++ b/src/main/python/apache/thermos/observer/http/file_browser.py
@@ -124,6 +124,8 @@ class TaskObserverFileBrowser(object):
     if path == "":
       path = None
     chroot, path = self._observer.valid_path(task_id, path)
+    if chroot is None or path is None:
+      bottle.abort(404, "Sandbox does not exist.")
     return dict(task_id=task_id, chroot=chroot, path=path)
 
   @HttpServer.route("/download/:task_id/:path#.+#")

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c0b37015/src/main/python/apache/thermos/observer/observed_task.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/observed_task.py b/src/main/python/apache/thermos/observer/observed_task.py
index f33aecb..08540e1 100644
--- a/src/main/python/apache/thermos/observer/observed_task.py
+++ b/src/main/python/apache/thermos/observer/observed_task.py
@@ -12,6 +12,7 @@
 # limitations under the License.
 #
 
+import copy
 import os
 from abc import abstractproperty
 
@@ -20,9 +21,12 @@ from twitter.common import log
 from twitter.common.lang import AbstractClass
 
 from apache.thermos.common.ckpt import CheckpointDispatcher
+from apache.thermos.common.path import TaskPath
 from apache.thermos.config.loader import ThermosTaskWrapper
 from apache.thermos.config.schema import ThermosContext
 
+from gen.apache.thermos.ttypes import RunnerState
+
 
 class ObservedTask(AbstractClass):
   """ Represents a Task being observed """
@@ -34,11 +38,16 @@ class ObservedTask(AbstractClass):
     except OSError:
       return None
 
-  def __init__(self, task_id, pathspec):
+  def __init__(self, root, task_id):
+    self._root = root
     self._task_id = task_id
-    self._pathspec = pathspec
+    self._pathspec = TaskPath(root=self._root, task_id=self._task_id)
     self._mtime = self._get_mtime()
 
+  @property
+  def root(self):
+    return self._root
+
   @abstractproperty
   def type(self):
     """Indicates the type of task (active or finished)"""
@@ -47,7 +56,7 @@ class ObservedTask(AbstractClass):
     """Read the corresponding task from disk and return a ThermosTask.  Memoizes already-read tasks.
     """
     if self._task_id not in memoized:
-      path = self._pathspec.given(task_id=self._task_id, state=self.type).getpath('task_path')
+      path = self._pathspec.given(state=self.type).getpath('task_path')
       if os.path.exists(path):
         task = ThermosTaskWrapper.from_file(path)
         if task is None:
@@ -63,8 +72,7 @@ class ObservedTask(AbstractClass):
 
   def _get_mtime(self):
     """Retrieve the mtime of the task's state directory"""
-    get_path = lambda state: self._pathspec.given(
-      task_id=self._task_id, state=state).getpath('task_path')
+    get_path = lambda state: self._pathspec.given(state=state).getpath('task_path')
     mtime = self.safe_mtime(get_path('active'))
     if mtime is None:
       mtime = self.safe_mtime(get_path('finished'))
@@ -105,8 +113,8 @@ class ObservedTask(AbstractClass):
 class ActiveObservedTask(ObservedTask):
   """An active Task known by the TaskObserver"""
 
-  def __init__(self, task_id, pathspec, task_monitor, resource_monitor):
-    super(ActiveObservedTask, self).__init__(task_id, pathspec)
+  def __init__(self, root, task_id, task_monitor, resource_monitor):
+    super(ActiveObservedTask, self).__init__(root, task_id)
     self._task_monitor = task_monitor
     self._resource_monitor = resource_monitor
 
@@ -133,8 +141,8 @@ class ActiveObservedTask(ObservedTask):
 class FinishedObservedTask(ObservedTask):
   """A finished Task known by the TaskObserver"""
 
-  def __init__(self, task_id, pathspec):
-    super(FinishedObservedTask, self).__init__(task_id, pathspec)
+  def __init__(self, root, task_id):
+    super(FinishedObservedTask, self).__init__(root, task_id)
     self._state = None
 
   @property
@@ -145,6 +153,6 @@ class FinishedObservedTask(ObservedTask):
   def state(self):
     """Return final state of Task (RunnerState, read from disk and cached for future access)"""
     if self._state is None:
-      path = self._pathspec.given(task_id=self._task_id).getpath('runner_checkpoint')
+      path = self._pathspec.getpath('runner_checkpoint')
       self._state = CheckpointDispatcher.from_file(path)
-    return self._state
+    return copy.deepcopy(self._state) if self._state else RunnerState(processes={})

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c0b37015/src/main/python/apache/thermos/observer/task_observer.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/task_observer.py b/src/main/python/apache/thermos/observer/task_observer.py
index cd528dc..6e7517b 100644
--- a/src/main/python/apache/thermos/observer/task_observer.py
+++ b/src/main/python/apache/thermos/observer/task_observer.py
@@ -30,11 +30,11 @@ from twitter.common.lang import Lockable
 from twitter.common.quantity import Amount, Time
 
 from apache.thermos.common.path import TaskPath
-from apache.thermos.monitoring.detector import TaskDetector
 from apache.thermos.monitoring.monitor import TaskMonitor
 from apache.thermos.monitoring.process import ProcessSample
 from apache.thermos.monitoring.resource import ResourceMonitorBase, TaskResourceMonitor
 
+from .detector import ObserverTaskDetector
 from .observed_task import ActiveObservedTask, FinishedObservedTask
 
 from gen.apache.thermos.ttypes import ProcessState, TaskState
@@ -52,14 +52,17 @@ class TaskObserver(ExceptionalThread, Lockable):
   class UnexpectedError(Exception): pass
   class UnexpectedState(Exception): pass
 
-  POLLING_INTERVAL = Amount(1, Time.SECONDS)
+  POLLING_INTERVAL = Amount(5, Time.SECONDS)
 
-  def __init__(self, root, resource_monitor_class=TaskResourceMonitor):
-    self._pathspec = TaskPath(root=root)
-    self._detector = TaskDetector(root)
+  def __init__(self, path_detector, resource_monitor_class=TaskResourceMonitor):
+    self._detector = ObserverTaskDetector(
+        path_detector,
+        self.__on_active,
+        self.__on_finished,
+        self.__on_removed)
     if not issubclass(resource_monitor_class, ResourceMonitorBase):
       raise ValueError("resource monitor class must implement ResourceMonitorBase!")
-    self._resource_monitor = resource_monitor_class
+    self._resource_monitor_class = resource_monitor_class
     self._active_tasks = {}    # task_id => ActiveObservedTask
     self._finished_tasks = {}  # task_id => FinishedObservedTask
     self._stop_event = threading.Event()
@@ -88,42 +91,34 @@ class TaskObserver(ExceptionalThread, Lockable):
   def start(self):
     ExceptionalThread.start(self)
 
-  @Lockable.sync
-  def add_active_task(self, task_id):
+  def __on_active(self, root, task_id):
+    log.debug('on_active(%r, %r)' % (root, task_id))
     if task_id in self.finished_tasks:
       log.error('Found an active task (%s) in finished tasks?' % task_id)
       return
-    task_monitor = TaskMonitor(self._pathspec, task_id)
-    if not task_monitor.get_state().header:
-      log.info('Unable to load task "%s"' % task_id)
-      return
-    sandbox = task_monitor.get_state().header.sandbox
-    resource_monitor = self._resource_monitor(task_monitor, sandbox)
+    task_monitor = TaskMonitor(root, task_id)
+    resource_monitor = self._resource_monitor_class(task_id, task_monitor)
     resource_monitor.start()
     self._active_tasks[task_id] = ActiveObservedTask(
-      task_id=task_id, pathspec=self._pathspec,
-      task_monitor=task_monitor, resource_monitor=resource_monitor
+        root,
+        task_id,
+        task_monitor,
+        resource_monitor
     )
 
-  @Lockable.sync
-  def add_finished_task(self, task_id):
-    self._finished_tasks[task_id] = FinishedObservedTask(
-      task_id=task_id, pathspec=self._pathspec
-    )
+  def __on_finished(self, root, task_id):
+    log.debug('on_finished(%r, %r)' % (root, task_id))
+    active_task = self._active_tasks.pop(task_id, None)
+    if active_task:
+      active_task.resource_monitor.kill()
+    self._finished_tasks[task_id] = FinishedObservedTask(root, task_id)
 
-  @Lockable.sync
-  def active_to_finished(self, task_id):
-    self.remove_active_task(task_id)
-    self.add_finished_task(task_id)
-
-  @Lockable.sync
-  def remove_active_task(self, task_id):
-    task = self.active_tasks.pop(task_id)
-    task.resource_monitor.kill()
-
-  @Lockable.sync
-  def remove_finished_task(self, task_id):
-    self.finished_tasks.pop(task_id)
+  def __on_removed(self, root, task_id):
+    log.debug('on_removed(%r, %r)' % (root, task_id))
+    active_task = self._active_tasks.pop(task_id, None)
+    if active_task:
+      active_task.resource_monitor.kill()
+    self._finished_tasks.pop(task_id, None)
 
   def run(self):
     """
@@ -133,32 +128,8 @@ class TaskObserver(ExceptionalThread, Lockable):
     """
     while not self._stop_event.is_set():
       time.sleep(self.POLLING_INTERVAL.as_(Time.SECONDS))
-
-      active_tasks = [task_id for _, task_id in self._detector.get_task_ids(state='active')]
-      finished_tasks = [task_id for _, task_id in self._detector.get_task_ids(state='finished')]
-
       with self.lock:
-
-        # Ensure all tasks currently detected on the system are observed appropriately
-        for active in active_tasks:
-          if active not in self.active_tasks:
-            log.debug('task_id %s (unknown) -> active' % active)
-            self.add_active_task(active)
-        for finished in finished_tasks:
-          if finished in self.active_tasks:
-            log.debug('task_id %s active -> finished' % finished)
-            self.active_to_finished(finished)
-          elif finished not in self.finished_tasks:
-            log.debug('task_id %s (unknown) -> finished' % finished)
-            self.add_finished_task(finished)
-
-        # Remove ObservedTasks for tasks no longer detected on the system
-        for unknown in set(self.active_tasks) - set(active_tasks + finished_tasks):
-          log.debug('task_id %s active -> (unknown)' % unknown)
-          self.remove_active_task(unknown)
-        for unknown in set(self.finished_tasks) - set(active_tasks + finished_tasks):
-          log.debug('task_id %s finished -> (unknown)' % unknown)
-          self.remove_finished_task(unknown)
+        self._detector.refresh()
 
   @Lockable.sync
   def process_from_name(self, task_id, process_id):
@@ -183,11 +154,9 @@ class TaskObserver(ExceptionalThread, Lockable):
 
   @Lockable.sync
   def task_id_count(self):
-    """
-      Return the raw count of active and finished task_ids from the TaskDetector.
-    """
-    num_active = len(list(self._detector.get_task_ids(state='active')))
-    num_finished = len(list(self._detector.get_task_ids(state='finished')))
+    """Return the raw count of active and finished task_ids."""
+    num_active = len(self._detector.active_tasks)
+    num_finished = len(self._detector.finished_tasks)
     return dict(active=num_active, finished=num_finished, all=num_active + num_finished)
 
   def _get_tasks_of_type(self, type):
@@ -581,8 +550,18 @@ class TaskObserver(ExceptionalThread, Lockable):
     run = self.get_run_number(runner_state, process, run)
     if run is None:
       return {}
-    log_path = self._pathspec.given(task_id=task_id, process=process, run=run,
-                                    log_dir=runner_state.header.log_dir).getpath('process_logdir')
+    observed_task = self.all_tasks.get(task_id, None)
+    if not observed_task:
+      return {}
+
+    log_path = TaskPath(
+        root=observed_task.root,
+        task_id=task_id,
+        process=process,
+        run=run,
+        log_dir=runner_state.header.log_dir,
+    ).getpath('process_logdir')
+
     return dict(
       stdout=[log_path, 'stdout'],
       stderr=[log_path, 'stderr']
@@ -628,7 +607,7 @@ class TaskObserver(ExceptionalThread, Lockable):
     except AttributeError:
       return None, None
     chroot, path = self._sanitize_path(chroot, path)
-    if chroot and path:
+    if chroot and path and os.path.exists(os.path.join(chroot, path)):
       return chroot, path
     return None, None
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c0b37015/src/test/python/apache/aurora/executor/BUILD
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/BUILD b/src/test/python/apache/aurora/executor/BUILD
index b8dd28c..013f056 100644
--- a/src/test/python/apache/aurora/executor/BUILD
+++ b/src/test/python/apache/aurora/executor/BUILD
@@ -66,7 +66,6 @@ python_tests(name = 'thermos_executor',
     '3rdparty/python:twitter.common.app',
     '3rdparty/python:twitter.common.exceptions',
     '3rdparty/python:twitter.common.quantity',
-    'src/main/python/apache/thermos/common',
     'src/main/python/apache/thermos/core',
     'src/main/python/apache/thermos/monitoring:monitor',
     'src/main/python/apache/aurora/config:schema',

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c0b37015/src/test/python/apache/aurora/executor/common/BUILD
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/common/BUILD b/src/test/python/apache/aurora/executor/common/BUILD
index 7b73f69..b3da27b 100644
--- a/src/test/python/apache/aurora/executor/common/BUILD
+++ b/src/test/python/apache/aurora/executor/common/BUILD
@@ -154,5 +154,6 @@ python_tests(
     'src/main/python/apache/aurora/config:schema',
     'src/main/python/apache/aurora/executor/common:sandbox',
     'src/main/python/apache/aurora/executor/common:resource_manager',
+    'src/main/python/apache/thermos/core:helper',
   ]
 )

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c0b37015/src/test/python/apache/aurora/executor/common/test_resource_manager_integration.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/common/test_resource_manager_integration.py b/src/test/python/apache/aurora/executor/common/test_resource_manager_integration.py
index 8f288f6..c473808 100644
--- a/src/test/python/apache/aurora/executor/common/test_resource_manager_integration.py
+++ b/src/test/python/apache/aurora/executor/common/test_resource_manager_integration.py
@@ -18,12 +18,15 @@ import threading
 import mock
 from mesos.interface import mesos_pb2
 from twitter.common.contextutil import temporary_dir
-from twitter.common.quantity import Amount, Time
 
 from apache.aurora.executor.common.resource_manager import ResourceManagerProvider
 from apache.aurora.executor.common.sandbox import DirectorySandbox
+from apache.thermos.common.path import TaskPath
+from apache.thermos.core.helper import TaskRunnerHelper
 from apache.thermos.monitoring.disk import DiskCollector
 
+from gen.apache.thermos.ttypes import RunnerCkpt, RunnerHeader
+
 
 # TODO(jcohen): There should really be a single canonical source for creating test jobs/tasks
 def make_assigned_task(thermos_config, assigned_ports=None):
@@ -67,10 +70,22 @@ def make_job(role, environment, name, primary_port, portmap):
   return job
 
 
+def write_header(root, sandbox, task_id):
+  log_dir = os.path.join(sandbox, '.logs')
+  path = TaskPath(root=root, task_id=task_id, log_dir=log_dir)
+  header = RunnerHeader(task_id=task_id, sandbox=sandbox, log_dir=log_dir)
+  ckpt = TaskRunnerHelper.open_checkpoint(path.getpath('runner_checkpoint'))
+  ckpt.write(RunnerCkpt(runner_header=header))
+  ckpt.close()
+
+
 def test_resource_manager():
   with temporary_dir() as td:
+    assigned_task = make_assigned_task(
+        make_job('some-role', 'some-env', 'some-job', 'http', portmap={'http': 80}))
     sandbox = os.path.join(td, 'sandbox')
     root = os.path.join(td, 'thermos')
+    write_header(root, sandbox, assigned_task.taskId)
 
     mock_disk_collector_class = mock.create_autospec(DiskCollector, spec_set=True)
     mock_disk_collector = mock_disk_collector_class.return_value
@@ -80,18 +95,11 @@ def test_resource_manager():
     type(mock_disk_collector).value = value_mock
 
     completed_event = threading.Event()
-    completed_event.set()
-    completed_mock = mock.PropertyMock(completed_event)
+    completed_mock = mock.PropertyMock(return_value=completed_event)
     type(mock_disk_collector).completed_event = completed_mock
 
-    rmp = ResourceManagerProvider(
-        root,
-        disk_collector=mock_disk_collector_class,
-        disk_collection_interval=Amount(1, Time.SECONDS))
-    rm = rmp.from_assigned_task(
-        make_assigned_task(
-            make_job('some-role', 'some-env', 'some-job', 'http', portmap={'http': 80})),
-        DirectorySandbox(sandbox))
+    rmp = ResourceManagerProvider(root, disk_collector=mock_disk_collector_class)
+    rm = rmp.from_assigned_task(assigned_task, DirectorySandbox(sandbox))
 
     assert rm.status is None
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c0b37015/src/test/python/apache/aurora/executor/test_thermos_executor.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/test_thermos_executor.py b/src/test/python/apache/aurora/executor/test_thermos_executor.py
index 6cc928e..26b26f4 100644
--- a/src/test/python/apache/aurora/executor/test_thermos_executor.py
+++ b/src/test/python/apache/aurora/executor/test_thermos_executor.py
@@ -53,7 +53,6 @@ from apache.aurora.executor.thermos_task_runner import (
     DefaultThermosTaskRunnerProvider,
     ThermosTaskRunner
 )
-from apache.thermos.common.path import TaskPath
 from apache.thermos.core.runner import TaskRunner
 from apache.thermos.monitoring.monitor import TaskMonitor
 
@@ -288,7 +287,7 @@ class TestThermosExecutor(object):
           sandbox_provider=DefaultTestSandboxProvider())
       te.launchTask(proxy_driver, make_task(HELLO_WORLD_MTI))
       te.terminated.wait()
-      tm = TaskMonitor(TaskPath(root=tempdir), task_id=HELLO_WORLD_TASK_ID)
+      tm = TaskMonitor(tempdir, task_id=HELLO_WORLD_TASK_ID)
       runner_state = tm.get_state()
 
     assert 'hello_world_hello_world-001' in runner_state.processes, (
@@ -313,7 +312,7 @@ class TestThermosExecutor(object):
       while te._status_manager is None:
         time.sleep(0.1)
       te.terminated.wait()
-      tm = TaskMonitor(TaskPath(root=tempdir), task_id=HELLO_WORLD_TASK_ID)
+      tm = TaskMonitor(tempdir, task_id=HELLO_WORLD_TASK_ID)
       runner_state = tm.get_state()
 
     assert 'hello_world_hello_world-001' in runner_state.processes, (

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c0b37015/src/test/python/apache/thermos/bin/test_thermos.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/thermos/bin/test_thermos.py b/src/test/python/apache/thermos/bin/test_thermos.py
index 2d9d333..ae15571 100644
--- a/src/test/python/apache/thermos/bin/test_thermos.py
+++ b/src/test/python/apache/thermos/bin/test_thermos.py
@@ -12,4 +12,7 @@
 # limitations under the License.
 #
 
-from apache.thermos.bin import thermos  # noqa
+
+def test_thermos_binary_import():
+  from apache.thermos.bin import thermos  # noqa
+  assert thermos is not None

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c0b37015/src/test/python/apache/thermos/core/test_staged_kill.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/thermos/core/test_staged_kill.py b/src/test/python/apache/thermos/core/test_staged_kill.py
index faa23ae..b52fcba 100644
--- a/src/test/python/apache/thermos/core/test_staged_kill.py
+++ b/src/test/python/apache/thermos/core/test_staged_kill.py
@@ -79,7 +79,7 @@ class RunnerBase(object):
 class ProcessPidTestCase(object):
   def test_process_kill(self):
     runner = self.start_runner()
-    tm = TaskMonitor(runner.pathspec, runner.task_id)
+    tm = TaskMonitor(runner.tempdir, runner.task_id)
     self.wait_until_running(tm)
 
     process_state, run_number = tm.get_active_processes()[0]
@@ -109,7 +109,7 @@ class TestRunnerKill(RunnerBase, ProcessPidTestCase):
 
   def test_coordinator_kill(self):
     runner = self.start_runner()
-    tm = TaskMonitor(runner.pathspec, runner.task_id)
+    tm = TaskMonitor(runner.tempdir, runner.task_id)
     self.wait_until_running(tm)
 
     process_state, run_number = tm.get_active_processes()[0]
@@ -140,7 +140,7 @@ class TestRunnerKillProcessTrappingSIGTERM(RunnerBase):
 
   def test_coordinator_kill(self):
     runner = self.start_runner()
-    tm = TaskMonitor(runner.pathspec, runner.task_id)
+    tm = TaskMonitor(runner.tempdir, runner.task_id)
     self.wait_until_running(tm)
     process_state, run_number = tm.get_active_processes()[0]
     assert process_state.process == 'ignorant_process'
@@ -179,7 +179,7 @@ class TestRunnerKillProcessTrappingSIGTERM(RunnerBase):
 
   def test_coordinator_dead_kill(self):
     runner = self.start_runner()
-    tm = TaskMonitor(runner.pathspec, runner.task_id)
+    tm = TaskMonitor(runner.tempdir, runner.task_id)
     self.wait_until_running(tm)
     process_state, run_number = tm.get_active_processes()[0]
     assert process_state.process == 'ignorant_process'
@@ -201,7 +201,7 @@ class TestRunnerKillProcessTrappingSIGTERM(RunnerBase):
   @pytest.mark.skipif('True')
   def test_preemption_wait(self):
     runner = self.start_runner()
-    tm = TaskMonitor(runner.pathspec, runner.task_id)
+    tm = TaskMonitor(runner.tempdir, runner.task_id)
     self.wait_until_running(tm)
     process_state, run_number = tm.get_active_processes()[0]
     assert process_state.process == 'ignorant_process'
@@ -251,7 +251,7 @@ class TestRunnerKillProcessGroup(RunnerBase):
 
   def test_pg_is_killed(self):
     runner = self.start_runner()
-    tm = TaskMonitor(runner.pathspec, runner.task_id)
+    tm = TaskMonitor(runner.tempdir, runner.task_id)
     self.wait_until_running(tm)
     process_state, run_number = tm.get_active_processes()[0]
     assert process_state.process == 'process'

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c0b37015/src/test/python/apache/thermos/monitoring/test_resource.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/thermos/monitoring/test_resource.py b/src/test/python/apache/thermos/monitoring/test_resource.py
index 52d8139..a7ab360 100644
--- a/src/test/python/apache/thermos/monitoring/test_resource.py
+++ b/src/test/python/apache/thermos/monitoring/test_resource.py
@@ -17,7 +17,6 @@ from unittest import TestCase
 
 import mock
 
-from apache.thermos.common.path import TaskPath
 from apache.thermos.monitoring.monitor import TaskMonitor
 from apache.thermos.monitoring.process import ProcessSample
 from apache.thermos.monitoring.resource import (
@@ -64,14 +63,14 @@ class TestTaskResouceMonitor(TestCase):
       autospec=True, spec_set=True)
   def test_sample_by_process(self, mock_get_active_processes, mock_sample):
     fake_process_name = 'fake-process-name'
-    task_path = TaskPath(root='.')
+    task_path = '.'
     task_monitor = TaskMonitor(task_path, 'fake-task-id')
     fake_process_status = ProcessStatus(process=fake_process_name)
     mock_get_active_processes.return_value = [(fake_process_status, 1)]
     fake_process_sample = ProcessSample.empty()
     mock_sample.return_value = fake_process_sample
 
-    task_resource_monitor = TaskResourceMonitor(task_monitor, '.')
+    task_resource_monitor = TaskResourceMonitor('fake-task-id', task_monitor)
 
     assert fake_process_sample == task_resource_monitor.sample_by_process(fake_process_name)
     assert mock_get_active_processes.mock_calls == [mock.call(task_monitor)]
@@ -81,12 +80,12 @@ class TestTaskResouceMonitor(TestCase):
   @mock.patch('apache.thermos.monitoring.monitor.TaskMonitor.get_active_processes',
       autospec=True, spec_set=True)
   def test_sample_by_process_no_process(self, mock_get_active_processes):
-    task_path = TaskPath(root='.')
+    task_path = '.'
 
     task_monitor = TaskMonitor(task_path, 'fake-task-id')
     mock_get_active_processes.return_value = []
 
-    task_resource_monitor = TaskResourceMonitor(task_monitor, '.')
+    task_resource_monitor = TaskResourceMonitor('fake-task-id', task_monitor)
 
     with self.assertRaises(ValueError):
       task_resource_monitor.sample_by_process('fake-process-name')

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c0b37015/src/test/python/apache/thermos/observer/BUILD
----------------------------------------------------------------------
diff --git a/src/test/python/apache/thermos/observer/BUILD b/src/test/python/apache/thermos/observer/BUILD
new file mode 100644
index 0000000..ff92a52
--- /dev/null
+++ b/src/test/python/apache/thermos/observer/BUILD
@@ -0,0 +1,30 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+python_test_suite(
+  name = 'all',
+  dependencies = [
+    ':test_detector',
+  ]
+)
+
+python_tests(
+  name = 'test_detector',
+  sources = ['test_detector.py'],
+  dependencies = [
+    '3rdparty/python:mock',
+    'src/main/python/apache/thermos/observer:detector',
+  ]
+)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c0b37015/src/test/python/apache/thermos/observer/test_detector.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/thermos/observer/test_detector.py b/src/test/python/apache/thermos/observer/test_detector.py
new file mode 100644
index 0000000..cacb49d
--- /dev/null
+++ b/src/test/python/apache/thermos/observer/test_detector.py
@@ -0,0 +1,161 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import random
+from contextlib import contextmanager
+
+import mock
+import pytest
+
+from apache.thermos.monitoring.detector import PathDetector
+from apache.thermos.observer.detector import ObserverTaskDetector
+
+
+class EmptyPathDetector(PathDetector):
+  def get_paths(self):
+    return []
+
+
+class PatchingObserverTaskDetector(ObserverTaskDetector):
+  def __init__(self, *args, **kw):
+    self.__tasks = []
+    super(PatchingObserverTaskDetector, self).__init__(EmptyPathDetector(), *args, **kw)
+
+  def iter_tasks(self):
+    return iter(self.__tasks)
+
+  @contextmanager
+  def patched_tasks(self, active_tasks=(), finished_tasks=(), shuffle=True):
+    old_tasks = self.__tasks
+
+    tasks = [(root, t_id, 'active') for (root, t_id) in active_tasks] + [
+             (root, t_id, 'finished') for (root, t_id) in finished_tasks]
+
+    if shuffle:
+      random.shuffle(tasks)
+
+    self.__tasks = tasks
+    yield
+    self.__tasks = old_tasks
+
+
+def test_observer_task_detector_construction():
+  pod = PatchingObserverTaskDetector()
+  pod.refresh()
+  assert pod.active_tasks == set()
+  assert pod.finished_tasks == set()
+
+
+# eight transitions:
+#    #1 (n/e) -> active
+#    #2 (n/e) -> finished
+#    #3 active -> active
+#    #4 active -> finished
+#    #5 active -> (n/e)
+#    #6 finished -> active (Fail)
+#    #7 finished -> finished
+#    #8 finished -> (n/e)
+def make_mocks():
+  on_active, on_finished, on_removed = mock.Mock(), mock.Mock(), mock.Mock()
+  pod = PatchingObserverTaskDetector(
+      on_active=on_active,
+      on_finished=on_finished,
+      on_removed=on_removed,
+  )
+  return pod, on_active, on_finished, on_removed
+
+
+TASK1 = ('root1', 'task1')
+TASK2 = ('root2', 'task2')
+
+
+def test_observer_task_detector_standard_transitions():
+  pod, on_active, on_finished, on_removed = make_mocks()
+
+  with pod.patched_tasks(active_tasks=(TASK1,)):  # 1
+    pod.refresh()
+    assert pod.active_tasks == set([TASK1])
+    assert pod.finished_tasks == set()
+    on_active.assert_called_once_with(*TASK1)
+    assert on_finished.call_count == 0
+    assert on_removed.call_count == 0
+    on_active.reset_mock()
+
+  with pod.patched_tasks(active_tasks=(TASK1,)):  # 3
+    pod.refresh()
+    assert pod.active_tasks == set([TASK1])
+    assert pod.finished_tasks == set()
+    assert on_active.call_count == 0
+    assert on_finished.call_count == 0
+    assert on_removed.call_count == 0
+
+  with pod.patched_tasks(finished_tasks=(TASK1,)):  # 4
+    pod.refresh()
+    assert pod.active_tasks == set()
+    assert pod.finished_tasks == set([TASK1])
+    on_finished.assert_called_once_with(*TASK1)
+    assert on_active.call_count == 0
+    assert on_removed.call_count == 0
+    on_finished.reset_mock()
+
+  with pod.patched_tasks(finished_tasks=(TASK1,)):  # 7
+    pod.refresh()
+    assert pod.active_tasks == set()
+    assert pod.finished_tasks == set([TASK1])
+    assert on_finished.call_count == 0
+    assert on_active.call_count == 0
+    assert on_removed.call_count == 0
+
+  with pod.patched_tasks():  # 8
+    pod.refresh()
+    assert pod.active_tasks == set()
+    assert pod.finished_tasks == set()
+    on_removed.assert_called_once_with(*TASK1)
+    assert on_active.call_count == 0
+    assert on_finished.call_count == 0
+    on_removed.reset_mock()
+
+
+def test_observer_task_detector_nonstandard_transitions():
+  pod, on_active, on_finished, on_removed = make_mocks()
+
+  with pod.patched_tasks(active_tasks=(TASK1,)):
+    pod.refresh()
+    assert pod.active_tasks == set([TASK1])
+    on_active.reset_mock()
+
+  with pod.patched_tasks():  # 5
+    pod.refresh()
+    assert pod.active_tasks == set()
+    assert pod.finished_tasks == set()
+    on_finished.assert_called_once_with(*TASK1)
+    on_removed.assert_called_once_with(*TASK1)
+    assert on_active.call_count == 0
+    on_removed.reset_mock()
+    on_finished.reset_mock()
+
+  with pod.patched_tasks(finished_tasks=(TASK2,)):  # 2
+    pod.refresh()
+    assert pod.active_tasks == set()
+    assert pod.finished_tasks == set([TASK2])
+    on_active.assert_called_once_with(*TASK2)
+    on_finished.assert_called_once_with(*TASK2)
+    assert on_removed.call_count == 0
+    on_active.reset_mock()
+    on_finished.reset_mock()
+
+  with pod.patched_tasks(active_tasks=(TASK2,)):  # 6
+    with pytest.raises(AssertionError):
+      pod.refresh()


Mime
View raw message