aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wick...@apache.org
Subject git commit: AURORA-186: rename ThermosExecutor to AuroraExecutor, clean up thermos-isms from ExecutorBase
Date Fri, 21 Feb 2014 19:47:08 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 445ecd778 -> e6cd6a9ea


AURORA-186: rename ThermosExecutor to AuroraExecutor, clean up thermos-isms from ExecutorBase

split all thermos-specific stuff out of thermos_executor.py

Testing Done:

this adds executor-small and executor-large, the latter of which runs the
executor tests.

Both ./pants src/test/python/apache/aurora/executor:executor-large and e2e
passes locally for me.

Bugs closed: AURORA-186

Reviewed at https://reviews.apache.org/r/18239/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/e6cd6a9e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/e6cd6a9e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/e6cd6a9e

Branch: refs/heads/master
Commit: e6cd6a9eaa2f0a9a38ec710765bd4b36df085a09
Parents: 445ecd7
Author: Brian Wickman <wickman@apache.org>
Authored: Fri Feb 21 11:46:55 2014 -0800
Committer: Brian Wickman <wickman@apache.org>
Committed: Fri Feb 21 11:46:55 2014 -0800

----------------------------------------------------------------------
 src/main/python/apache/aurora/executor/BUILD    |  13 +-
 .../python/apache/aurora/executor/__init__.py   |   2 +-
 .../apache/aurora/executor/aurora_executor.py   | 301 ++++++++++++++++++
 .../python/apache/aurora/executor/bin/BUILD     |   2 +-
 .../executor/bin/thermos_executor_main.py       |   8 +-
 .../apache/aurora/executor/executor_base.py     |  51 +---
 .../apache/aurora/executor/gc_executor.py       |  36 ++-
 .../apache/aurora/executor/thermos_executor.py  | 304 -------------------
 .../aurora/executor/thermos_task_runner.py      |   5 +-
 src/test/python/apache/aurora/executor/BUILD    |  29 +-
 .../aurora/executor/test_thermos_executor.py    |  50 +--
 src/test/python/apache/thermos/common/BUILD     |   2 -
 12 files changed, 403 insertions(+), 400 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e6cd6a9e/src/main/python/apache/aurora/executor/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/BUILD b/src/main/python/apache/aurora/executor/BUILD
index 667975b..9f0cad0 100644
--- a/src/main/python/apache/aurora/executor/BUILD
+++ b/src/main/python/apache/aurora/executor/BUILD
@@ -67,22 +67,20 @@ python_library(
 )
 
 python_library(
-  name = 'thermos_executor_base',
+  name = 'executor_base',
   sources = ['executor_base.py'],
   dependencies = [
     pants('3rdparty/python:mesos'),
     pants('3rdparty/python:twitter.common.log'),
-    pants('src/main/thrift/org/apache/thermos:py-thrift'),
-    pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
   ]
 )
 
 python_library(
-  name = 'thermos_executor',
-  sources = ['thermos_executor.py'],
+  name = 'aurora_executor',
+  sources = ['aurora_executor.py'],
   dependencies = [
     pants(':status_manager'),
-    pants(':thermos_executor_base'),
+    pants(':executor_base'),
     pants('3rdparty/python:mesos'),
     pants('3rdparty/python:pystachio'),
     pants('3rdparty/python:twitter.common.app'),
@@ -113,7 +111,7 @@ python_library(
   sources = ['gc_executor.py'],
   dependencies = [
     pants(':executor_detector'),
-    pants(':thermos_executor_base'),
+    pants(':executor_base'),
     pants('3rdparty/python:mesos'),
     pants('3rdparty/python:psutil'),
     pants('3rdparty/python:twitter.common.collections'),
@@ -130,6 +128,7 @@ python_library(
     pants('src/main/python/apache/aurora/config:schema'),
     pants('src/main/python/apache/aurora/executor/common:sandbox'),
     pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
+    pants('src/main/thrift/org/apache/thermos:py-thrift'),
   ]
 )
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e6cd6a9e/src/main/python/apache/aurora/executor/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/__init__.py b/src/main/python/apache/aurora/executor/__init__.py
index d54d88c..6cfe8d6 100644
--- a/src/main/python/apache/aurora/executor/__init__.py
+++ b/src/main/python/apache/aurora/executor/__init__.py
@@ -14,4 +14,4 @@
 # limitations under the License.
 #
 
-__import__('pkg_resources').declare_namespace(__name__)
\ No newline at end of file
+__import__('pkg_resources').declare_namespace(__name__)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e6cd6a9e/src/main/python/apache/aurora/executor/aurora_executor.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/aurora_executor.py b/src/main/python/apache/aurora/executor/aurora_executor.py
new file mode 100644
index 0000000..28e5054
--- /dev/null
+++ b/src/main/python/apache/aurora/executor/aurora_executor.py
@@ -0,0 +1,301 @@
+#
+# Copyright 2013 Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import threading
+import time
+import traceback
+
+from .common.kill_manager import KillManager
+from .common.sandbox import DirectorySandbox, SandboxProvider
+from .common.status_checker import ChainedStatusChecker, ExitState
+from .common.task_info import assigned_task_from_mesos_task
+from .common.task_runner import (
+    TaskError,
+    TaskRunner,
+    TaskRunnerProvider,
+)
+from .executor_base import ExecutorBase
+from .status_manager import StatusManager
+
+import mesos_pb2 as mesos_pb
+from twitter.common import log
+from twitter.common.concurrent import deadline, defer, Timeout
+from twitter.common.metrics import Observable
+from twitter.common.quantity import Amount, Time
+
+
+class DefaultSandboxProvider(SandboxProvider):
+  SANDBOX_NAME = 'sandbox'
+
+  def from_assigned_task(self, assigned_task):
+    return DirectorySandbox(
+        os.path.realpath(self.SANDBOX_NAME),
+        assigned_task.task.owner.role)
+
+
+class AuroraExecutor(ExecutorBase, Observable):
+  PERSISTENCE_WAIT = Amount(5, Time.SECONDS)
+  SANDBOX_INITIALIZATION_TIMEOUT = Amount(10, Time.MINUTES)
+  START_TIMEOUT = Amount(2, Time.MINUTES)
+  STOP_TIMEOUT = Amount(2, Time.MINUTES)
+  STOP_WAIT = Amount(5, Time.SECONDS)
+
+  def __init__(self,
+               runner_provider,
+               status_manager_class=StatusManager,
+               sandbox_provider=DefaultSandboxProvider,
+               status_providers=(),
+               clock=time):
+
+    ExecutorBase.__init__(self)
+    if not isinstance(runner_provider, TaskRunnerProvider):
+      raise TypeError('runner_provider must be a TaskRunnerProvider, got %s' %
+          type(runner_provider))
+    self._runner = None
+    self._runner_provider = runner_provider
+    self._clock = clock
+    self._task_id = None
+    self._status_providers = status_providers
+    self._status_manager = None
+    self._status_manager_class = status_manager_class
+    self._sandbox = None
+    self._sandbox_provider = sandbox_provider()
+    self._kill_manager = KillManager()
+    # Events that are exposed for interested entities
+    self.runner_aborted = threading.Event()
+    self.runner_started = threading.Event()
+    self.sandbox_initialized = threading.Event()
+    self.sandbox_created = threading.Event()
+    self.terminated = threading.Event()
+    self.launched = threading.Event()
+
+  @property
+  def runner(self):
+    return self._runner
+
+  def _die(self, driver, status, msg):
+    log.fatal(msg)
+    self.send_update(driver, self._task_id, status, msg)
+    defer(driver.stop, delay=self.STOP_WAIT)
+
+  def _run(self, driver, assigned_task):
+    """
+      Commence running a Task.
+        - Initialize the sandbox
+        - Start the ThermosTaskRunner (fork the Thermos TaskRunner)
+        - Set up necessary HealthCheckers
+        - Set up StatusManager, and attach HealthCheckers
+    """
+    self.send_update(driver, self._task_id, mesos_pb.TASK_STARTING, 'Initializing sandbox.')
+
+    if not self._initialize_sandbox(driver, assigned_task):
+      return
+
+    # start the process on a separate thread and give the message processing thread back
+    # to the driver
+    try:
+      self._runner = self._runner_provider.from_assigned_task(assigned_task, self._sandbox)
+    except TaskError as e:
+      self.runner_aborted.set()
+      self._die(driver, mesos_pb.TASK_FAILED, str(e))
+      return
+
+    if not isinstance(self._runner, TaskRunner):
+      self._die(driver, mesos_pb.TASK_FAILED, 'Unrecognized task!')
+      return
+
+    if not self._start_runner(driver, assigned_task):
+      return
+
+    self.send_update(driver, self._task_id, mesos_pb.TASK_RUNNING)
+
+    self._start_status_manager(driver, assigned_task)
+
+  def _initialize_sandbox(self, driver, assigned_task):
+    self._sandbox = self._sandbox_provider.from_assigned_task(assigned_task)
+    self.sandbox_initialized.set()
+    try:
+      deadline(self._sandbox.create, timeout=self.SANDBOX_INITIALIZATION_TIMEOUT,
+               daemon=True, propagate=True)
+    except Timeout:
+      self._die(driver, mesos_pb.TASK_FAILED, 'Timed out waiting for sandbox to initialize!')
+      return
+    except self._sandbox.Error as e:
+      self._die(driver, mesos_pb.TASK_FAILED, 'Failed to initialize sandbox: %s' % e)
+      return
+    self.sandbox_created.set()
+    return True
+
+  def _start_runner(self, driver, assigned_task):
+    if self.runner_aborted.is_set():
+      self._die(driver, mesos_pb.TASK_KILLED, 'Task killed during initialization.')
+
+    try:
+      deadline(self._runner.start, timeout=self.START_TIMEOUT, propagate=True)
+    except TaskError as e:
+      self._die(driver, mesos_pb.TASK_FAILED, 'Task initialization failed: %s' % e)
+      return False
+    except Timeout:
+      self._die(driver, mesos_pb.TASK_LOST, 'Timed out waiting for task to start!')
+      return False
+
+    self.runner_started.set()
+    log.debug('Task started.')
+
+    return True
+
+  def _start_status_manager(self, driver, assigned_task):
+    status_checkers = [self._kill_manager]
+    self.metrics.register_observable('kill_manager', self._kill_manager)
+
+    for status_provider in self._status_providers:
+      status_checker = status_provider.from_assigned_task(assigned_task, self._sandbox)
+      if status_checker is None:
+        continue
+      status_checkers.append(status_checker)
+      # self.metrics.register_observable()
+
+    self._chained_checker = ChainedStatusChecker(status_checkers)
+    self._chained_checker.start()
+
+    # chain the runner to the other checkers, but do not chain .start()/.stop()
+    complete_checker = ChainedStatusChecker([self._runner, self._chained_checker])
+    self._status_manager = self._status_manager_class(
+        complete_checker, self._shutdown, clock=self._clock)
+    self._status_manager.start()
+
+  def _signal_kill_manager(self, driver, task_id, reason):
+    if self._task_id is None:
+      log.error('Was asked to kill task but no task running!')
+      return
+    if task_id != self._task_id:
+      log.error('Asked to kill a task other than what we are running!')
+      return
+    if not self.sandbox_created.is_set():
+      log.error('Asked to kill task with incomplete sandbox - aborting runner start')
+      self.runner_aborted.set()
+      return
+    self.log('Activating kill manager.')
+    self._kill_manager.kill(reason)
+
+  @classmethod
+  def translate_exit_state_to_mesos(cls, exit_state):
+    # Put into executor_base?
+    if exit_state == ExitState.FAILED:
+      return mesos_pb.TASK_FAILED
+    elif exit_state == ExitState.KILLED:
+      return mesos_pb.TASK_KILLED
+    elif exit_state == ExitState.FINISHED:
+      return mesos_pb.TASK_FINISHED
+    elif exit_state == ExitState.LOST:
+      return mesos_pb.TASK_LOST
+    log.error('Unknown exit state, defaulting to TASK_FINISHED.')
+    return mesos_pb.TASK_FINISHED
+
+  def _shutdown(self, status_result):
+    runner_status = self._runner.status
+
+    try:
+      deadline(self._runner.stop, timeout=self.STOP_TIMEOUT)
+    except Timeout:
+      log.error('Failed to stop runner within deadline.')
+
+    try:
+      deadline(self._chained_checker.stop, timeout=self.STOP_TIMEOUT)
+    except Timeout:
+      log.error('Failed to stop all checkers within deadline.')
+
+    # If the runner was alive when _shutdown was called, defer to the status_result,
+    # otherwise the runner's terminal state is the preferred state.
+    exit_status = runner_status or status_result
+
+    self.send_update(
+        self._driver,
+        self._task_id,
+        self.translate_exit_state_to_mesos(exit_status.status),
+        status_result.reason)
+
+    self.terminated.set()
+    defer(self._driver.stop, delay=self.PERSISTENCE_WAIT)
+
+  @classmethod
+  def validate_task(cls, task):
+    try:
+      assigned_task = assigned_task_from_mesos_task(task)
+      return assigned_task
+    except Exception as e:
+      log.fatal('Could not deserialize AssignedTask')
+      log.fatal(traceback.format_exc())
+      return None
+
+  """ Mesos Executor API methods follow """
+
+  def launchTask(self, driver, task):
+    """
+      Invoked when a task has been launched on this executor (initiated via Scheduler::launchTasks).
+      Note that this task can be realized with a thread, a process, or some simple computation,
+      however, no other callbacks will be invoked on this executor until this callback has
returned.
+    """
+    self.launched.set()
+    self.log('launchTask got task: %s:%s' % (task.name, task.task_id.value))
+
+    # TODO(wickman)  Update the tests to call registered(), then remove this line and issue
+    # an assert if self._driver is not populated.
+    self._driver = driver
+
+    if self._runner:
+      log.error('Already running a task! %s' % self._task_id)
+      self.send_update(driver, task.task_id.value, mesos_pb.TASK_LOST,
+          "Task already running on this executor: %s" % self._task_id)
+      return
+
+    self._slave_id = task.slave_id.value
+    self._task_id = task.task_id.value
+
+    assigned_task = self.validate_task(task)
+    if not assigned_task:
+      self.send_update(driver, self._task_id, mesos_pb.TASK_FAILED,
+          'Could not deserialize task.')
+      defer(driver.stop, delay=self.STOP_WAIT)
+      return
+
+    defer(lambda: self._run(driver, assigned_task))
+
+  def killTask(self, driver, task_id):
+    """
+     Invoked when a task running within this executor has been killed (via
+     SchedulerDriver::killTask). Note that no status update will be sent on behalf of the
executor,
+     the executor is responsible for creating a new TaskStatus (i.e., with TASK_KILLED) and
invoking
+     ExecutorDriver::sendStatusUpdate.
+    """
+    self.log('killTask got task_id: %s' % task_id)
+    self._signal_kill_manager(driver, task_id.value, "Instructed to kill task.")
+    self.log('killTask returned.')
+
+  def shutdown(self, driver):
+    """
+     Invoked when the executor should terminate all of its currently running tasks. Note
that after
+     Mesos has determined that an executor has terminated any tasks that the executor did
not send
+     terminal status updates for (e.g., TASK_KILLED, TASK_FINISHED, TASK_FAILED, etc) a TASK_LOST
+     status update will be created.
+
+    """
+    self.log('shutdown called')
+    if self._task_id:
+      self.log('shutting down %s' % self._task_id)
+      self._signal_kill_manager(driver, self._task_id, "Told to shut down executor.")
+    self.log('shutdown returned')

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e6cd6a9e/src/main/python/apache/aurora/executor/bin/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/bin/BUILD b/src/main/python/apache/aurora/executor/bin/BUILD
index 2dcc69b..6d5330b 100644
--- a/src/main/python/apache/aurora/executor/bin/BUILD
+++ b/src/main/python/apache/aurora/executor/bin/BUILD
@@ -29,7 +29,7 @@ python_binary(
     pants('src/main/python/apache/aurora/executor/common:sandbox'),
     pants('src/main/python/apache/aurora/executor:executor_detector'),
     pants('src/main/python/apache/aurora/executor:executor_vars'),
-    pants('src/main/python/apache/aurora/executor:thermos_executor'),
+    pants('src/main/python/apache/aurora/executor:aurora_executor'),
     pants('src/main/python/apache/aurora/executor:thermos_task_runner'),
   ]
 )

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e6cd6a9e/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
index 8968e89..806dc91 100644
--- a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
+++ b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
@@ -25,7 +25,7 @@ import os
 
 from apache.aurora.executor.common.executor_timeout import ExecutorTimeout
 from apache.aurora.executor.common.health_checker import HealthCheckerProvider
-from apache.aurora.executor.thermos_executor import ThermosExecutor
+from apache.aurora.executor.aurora_executor import AuroraExecutor
 from apache.aurora.executor.thermos_task_runner import DefaultThermosTaskRunnerProvider
 
 import mesos
@@ -56,14 +56,14 @@ def dump_runner_pex():
 
 def proxy_main():
   def main():
-    runner_provider = DefaultThermosTaskRunnerProvider(
+    thermos_runner_provider = DefaultThermosTaskRunnerProvider(
         dump_runner_pex(),
         artifact_dir=os.path.realpath('.'),
     )
 
     # Create executor stub
-    thermos_executor = ThermosExecutor(
-        runner_provider=runner_provider,
+    thermos_executor = AuroraExecutor(
+        runner_provider=thermos_runner_provider,
         status_providers=(HealthCheckerProvider(),),
     )
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e6cd6a9e/src/main/python/apache/aurora/executor/executor_base.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/executor_base.py b/src/main/python/apache/aurora/executor/executor_base.py
index a1310cc..51309aa 100644
--- a/src/main/python/apache/aurora/executor/executor_base.py
+++ b/src/main/python/apache/aurora/executor/executor_base.py
@@ -14,18 +14,14 @@
 # limitations under the License.
 #
 
-from gen.apache.aurora.constants import TERMINAL_STATES as AURORA_TERMINAL_STATES
-from gen.apache.aurora.ttypes import ScheduleStatus
-from gen.apache.thermos.ttypes import TaskState
-
 import mesos
 import mesos_pb2 as mesos_pb
 from twitter.common import log
 
 
-class ThermosExecutorBase(mesos.Executor):
+class ExecutorBase(mesos.Executor):
   # Statuses are hard, let's go shopping.
-  MESOS_STATES = {
+  STATES_TO_STR = {
       mesos_pb.TASK_STARTING: 'STARTING',
       mesos_pb.TASK_RUNNING: 'RUNNING',
       mesos_pb.TASK_FINISHED: 'FINISHED',
@@ -34,45 +30,16 @@ class ThermosExecutorBase(mesos.Executor):
       mesos_pb.TASK_LOST: 'LOST',
   }
 
-  THERMOS_TO_MESOS_STATES = {
-      TaskState.ACTIVE: mesos_pb.TASK_RUNNING,
-      TaskState.SUCCESS: mesos_pb.TASK_FINISHED,
-      TaskState.FAILED: mesos_pb.TASK_FAILED,
-      TaskState.KILLED: mesos_pb.TASK_KILLED,
-      TaskState.LOST: mesos_pb.TASK_LOST,
-  }
-
-  THERMOS_TO_TWITTER_STATES = {
-      TaskState.ACTIVE: ScheduleStatus.RUNNING,
-      TaskState.CLEANING: ScheduleStatus.RUNNING,
-      TaskState.FINALIZING: ScheduleStatus.RUNNING,
-      TaskState.SUCCESS: ScheduleStatus.FINISHED,
-      TaskState.FAILED: ScheduleStatus.FAILED,
-      TaskState.KILLED: ScheduleStatus.KILLED,
-      TaskState.LOST: ScheduleStatus.LOST,
-  }
-
-  @staticmethod
-  def twitter_status_is_terminal(status):
-    return status in AURORA_TERMINAL_STATES
-
-  @staticmethod
-  def mesos_status_is_terminal(status):
-    return status in (
+  TERMINAL_STATES = frozenset([
         mesos_pb.TASK_FAILED,
         mesos_pb.TASK_FINISHED,
         mesos_pb.TASK_KILLED,
         mesos_pb.TASK_LOST,
-    )
+  ])
 
-  @staticmethod
-  def thermos_status_is_terminal(status):
-    return status in (
-        TaskState.FAILED,
-        TaskState.KILLED,
-        TaskState.LOST,
-        TaskState.SUCCESS,
-    )
+  @classmethod
+  def status_is_terminal(cls, status):
+    return status in cls.TERMINAL_STATES
 
   def __init__(self):
     self._slave_id = None
@@ -101,13 +68,13 @@ class ThermosExecutorBase(mesos.Executor):
     update = mesos_pb.TaskStatus()
     if not isinstance(state, int):
       raise TypeError('Invalid state type %s, should be int.' % type(state))
-    if state not in self.MESOS_STATES:
+    if state not in self.STATES_TO_STR:
       raise ValueError('Invalid state: %s' % state)
     update.state = state
     update.task_id.value = task_id
     if message:
       update.message = str(message)
-    self.log('Updating %s => %s' % (task_id, self.MESOS_STATES[state]))
+    self.log('Updating %s => %s' % (task_id, self.STATES_TO_STR[state]))
     self.log('   Reason: %s' % message)
     driver.sendStatusUpdate(update)
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e6cd6a9e/src/main/python/apache/aurora/executor/gc_executor.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/gc_executor.py b/src/main/python/apache/aurora/executor/gc_executor.py
index b8b5026..4a866b2 100644
--- a/src/main/python/apache/aurora/executor/gc_executor.py
+++ b/src/main/python/apache/aurora/executor/gc_executor.py
@@ -37,10 +37,12 @@ from gen.apache.aurora.comm.ttypes import (
     AdjustRetainedTasks,
     DeletedTasks,
     SchedulerMessage)
+from gen.apache.aurora.constants import TERMINAL_STATES
 from gen.apache.aurora.ttypes import ScheduleStatus
+from gen.apache.thermos.ttypes import TaskState
 
 from .common.sandbox import DirectorySandbox, SandboxInterface
-from .executor_base import ThermosExecutorBase
+from .executor_base import ExecutorBase
 from .executor_detector import ExecutorDetector
 
 import mesos_pb2 as mesos_pb
@@ -54,7 +56,27 @@ from twitter.common.metrics.gauge import AtomicGauge
 from twitter.common.quantity import Amount, Time
 
 
-class ThermosGCExecutor(ThermosExecutorBase, ExceptionalThread, Observable):
+THERMOS_TO_TWITTER_STATES = {
+    TaskState.ACTIVE: ScheduleStatus.RUNNING,
+    TaskState.CLEANING: ScheduleStatus.RUNNING,
+    TaskState.FINALIZING: ScheduleStatus.RUNNING,
+    TaskState.SUCCESS: ScheduleStatus.FINISHED,
+    TaskState.FAILED: ScheduleStatus.FAILED,
+    TaskState.KILLED: ScheduleStatus.KILLED,
+    TaskState.LOST: ScheduleStatus.LOST,
+}
+
+
+THERMOS_TO_MESOS_STATES = {
+    TaskState.ACTIVE: mesos_pb.TASK_RUNNING,
+    TaskState.SUCCESS: mesos_pb.TASK_FINISHED,
+    TaskState.FAILED: mesos_pb.TASK_FAILED,
+    TaskState.KILLED: mesos_pb.TASK_KILLED,
+    TaskState.LOST: mesos_pb.TASK_LOST,
+}
+
+
+class ThermosGCExecutor(ExecutorBase, ExceptionalThread, Observable):
   """
     Thermos GC Executor, responsible for:
       - garbage collecting old tasks to make sure they don't clutter up the system
@@ -89,7 +111,7 @@ class ThermosGCExecutor(ThermosExecutorBase, ExceptionalThread, Observable):
                executor_detector=ExecutorDetector,
                task_garbage_collector=TaskGarbageCollector,
                clock=time):
-    ThermosExecutorBase.__init__(self)
+    ExecutorBase.__init__(self)
     ExceptionalThread.__init__(self)
     self.daemon = True
     self._stop_event = threading.Event()
@@ -97,7 +119,7 @@ class ThermosGCExecutor(ThermosExecutorBase, ExceptionalThread, Observable):
                                         # the order in which they were received via a launchTask.
     self._driver = None   # cache the ExecutorDriver provided by the slave, so we can use
it
                           # out of band from slave-initiated callbacks. This should be supplied
by
-                          # ThermosExecutorBase.registered() when the executor first registers
with
+                          # ExecutorBase.registered() when the executor first registers with
                           # the slave.
     self._slave_id = None # cache the slave ID provided by the slave
     self._task_id = None  # the task_id currently being executed by the ThermosGCExecutor,
if any
@@ -220,7 +242,7 @@ class ThermosGCExecutor(ThermosExecutorBase, ExceptionalThread, Observable):
     def partition(rt):
       active, starting, finished = set(), set(), set()
       for task_id, schedule_status in rt.items():
-        if self.twitter_status_is_terminal(schedule_status):
+        if schedule_status in TERMINAL_STATES:
           finished.add(task_id)
         elif (schedule_status == ScheduleStatus.STARTING or
               schedule_status == ScheduleStatus.ASSIGNED):
@@ -263,11 +285,11 @@ class ThermosGCExecutor(ThermosExecutorBase, ExceptionalThread, Observable):
         states = self.get_states(task_id)
         if states:
           _, last_state = states[-1]
-          updates[task_id] = self.THERMOS_TO_TWITTER_STATES.get(last_state, ScheduleStatus.UNKNOWN)
+          updates[task_id] = THERMOS_TO_TWITTER_STATES.get(last_state, ScheduleStatus.UNKNOWN)
           self.send_update(
               driver,
               task_id,
-              self.THERMOS_TO_MESOS_STATES.get(last_state, mesos_pb.TASK_LOST),
+              THERMOS_TO_MESOS_STATES.get(last_state, mesos_pb.TASK_LOST),
               'Task finish detected by GC executor.')
         else:
           local_gc.update(self.should_gc_task(task_id))

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e6cd6a9e/src/main/python/apache/aurora/executor/thermos_executor.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/thermos_executor.py b/src/main/python/apache/aurora/executor/thermos_executor.py
deleted file mode 100644
index 252ba5f..0000000
--- a/src/main/python/apache/aurora/executor/thermos_executor.py
+++ /dev/null
@@ -1,304 +0,0 @@
-#
-# Copyright 2013 Apache Software Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import os
-import threading
-import time
-import traceback
-
-from .common.kill_manager import KillManager
-from .common.sandbox import DirectorySandbox, SandboxProvider
-from .common.status_checker import ChainedStatusChecker, ExitState
-from .common.task_info import (
-    assigned_task_from_mesos_task,
-    mesos_task_instance_from_assigned_task,
-    resolve_ports,
-)
-from .common.task_runner import (
-    TaskError,
-    TaskRunner,
-    TaskRunnerProvider,
-)
-from .executor_base import ThermosExecutorBase
-from .status_manager import StatusManager
-
-import mesos_pb2 as mesos_pb
-from twitter.common import log
-from twitter.common.concurrent import deadline, defer, Timeout
-from twitter.common.metrics import Observable
-from twitter.common.quantity import Amount, Time
-
-
-class DefaultSandboxProvider(SandboxProvider):
-  SANDBOX_NAME = 'sandbox'
-
-  def from_assigned_task(self, assigned_task):
-    mesos_task = mesos_task_instance_from_assigned_task(assigned_task)
-    return DirectorySandbox(
-        os.path.realpath(self.SANDBOX_NAME),
-        mesos_task.role().get())
-
-
-class ThermosExecutor(Observable, ThermosExecutorBase):
-  PERSISTENCE_WAIT = Amount(5, Time.SECONDS)
-  SANDBOX_INITIALIZATION_TIMEOUT = Amount(10, Time.MINUTES)
-  START_TIMEOUT = Amount(2, Time.MINUTES)
-  STOP_TIMEOUT = Amount(2, Time.MINUTES)
-  STOP_WAIT = Amount(5, Time.SECONDS)
-
-  def __init__(self,
-               runner_provider,
-               status_manager_class=StatusManager,
-               sandbox_provider=DefaultSandboxProvider,
-               status_providers=(),
-               clock=time):
-
-    ThermosExecutorBase.__init__(self)
-    if not isinstance(runner_provider, TaskRunnerProvider):
-      raise TypeError('runner_provider must be a TaskRunnerProvider, got %s' %
-          type(runner_provider))
-    self._runner = None
-    self._runner_provider = runner_provider
-    self._clock = clock
-    self._task_id = None
-    self._status_providers = status_providers
-    self._status_manager = None
-    self._status_manager_class = status_manager_class
-    self._sandbox = None
-    self._sandbox_provider = sandbox_provider()
-    self._kill_manager = KillManager()
-    # Events that are exposed for interested entities
-    self.runner_aborted = threading.Event()
-    self.runner_started = threading.Event()
-    self.sandbox_initialized = threading.Event()
-    self.sandbox_created = threading.Event()
-    self.terminated = threading.Event()
-    self.launched = threading.Event()
-
-  @property
-  def runner(self):
-    return self._runner
-
-  def _die(self, driver, status, msg):
-    log.fatal(msg)
-    self.send_update(driver, self._task_id, status, msg)
-    defer(driver.stop, delay=self.STOP_WAIT)
-
-  def _run(self, driver, assigned_task, mesos_task):
-    """
-      Commence running a Task.
-        - Initialize the sandbox
-        - Start the ThermosTaskRunner (fork the Thermos TaskRunner)
-        - Set up necessary HealthCheckers
-        - Set up DiscoveryManager, if applicable
-        - Set up ResourceCheckpointer
-        - Set up StatusManager, and attach HealthCheckers
-    """
-    self.send_update(driver, self._task_id, mesos_pb.TASK_STARTING, 'Initializing sandbox.')
-
-    if not self._initialize_sandbox(driver, assigned_task):
-      return
-
-    # Fully resolve the portmap
-    portmap = resolve_ports(mesos_task, assigned_task.assignedPorts)
-
-    # start the process on a separate thread and give the message processing thread back
-    # to the driver
-    try:
-      self._runner = self._runner_provider.from_assigned_task(assigned_task, self._sandbox)
-    except TaskError as e:
-      self._die(driver, mesos_pb.TASK_FAILED, str(e))
-      return
-
-    if not isinstance(self._runner, TaskRunner):
-      self._die(driver, mesos_pb.TASK_FAILED, 'Unrecognized task!')
-      return
-
-    if not self._start_runner(driver, assigned_task, mesos_task, portmap):
-      return
-
-    self.send_update(driver, self._task_id, mesos_pb.TASK_RUNNING)
-
-    self._start_status_manager(driver, assigned_task, mesos_task, portmap)
-
-  def _initialize_sandbox(self, driver, assigned_task):
-    self._sandbox = self._sandbox_provider.from_assigned_task(assigned_task)
-    self.sandbox_initialized.set()
-    try:
-      deadline(self._sandbox.create, timeout=self.SANDBOX_INITIALIZATION_TIMEOUT,
-               daemon=True, propagate=True)
-    except Timeout:
-      self._die(driver, mesos_pb.TASK_FAILED, 'Timed out waiting for sandbox to initialize!')
-      return
-    except self._sandbox.Error as e:
-      self._die(driver, mesos_pb.TASK_FAILED, 'Failed to initialize sandbox: %s' % e)
-      return
-    self.sandbox_created.set()
-    return True
-
-  def _start_runner(self, driver, assigned_task, mesos_task, portmap):
-    if self.runner_aborted.is_set():
-      self._die(driver, mesos_pb.TASK_KILLED, 'Task killed during initialization.')
-
-    try:
-      deadline(self._runner.start, timeout=self.START_TIMEOUT, propagate=True)
-    except TaskError as e:
-      self._die(driver, mesos_pb.TASK_FAILED, 'Task initialization failed: %s' % e)
-      return False
-    except Timeout:
-      self._die(driver, mesos_pb.TASK_LOST, 'Timed out waiting for task to start!')
-      return False
-
-    self.runner_started.set()
-    log.debug('Task started.')
-
-    return True
-
-  def _start_status_manager(self, driver, assigned_task, mesos_task, portmap):
-    status_checkers = [self._kill_manager]
-    self.metrics.register_observable('kill_manager', self._kill_manager)
-
-    for status_provider in self._status_providers:
-      status_checker = status_provider.from_assigned_task(assigned_task, self._sandbox)
-      if status_checker is None:
-        continue
-      status_checkers.append(status_checker)
-      # self.metrics.register_observable()
-
-    self._chained_checker = ChainedStatusChecker(status_checkers)
-    self._chained_checker.start()
-
-    # chain the runner to the other checkers, but do not chain .start()/.stop()
-    complete_checker = ChainedStatusChecker([self._runner, self._chained_checker])
-    self._status_manager = self._status_manager_class(
-        complete_checker, self._shutdown, clock=self._clock)
-    self._status_manager.start()
-
-  def _signal_kill_manager(self, driver, task_id, reason):
-    if self._task_id is None:
-      log.error('Was asked to kill task but no task running!')
-      return
-    if task_id != self._task_id:
-      log.error('Asked to kill a task other than what we are running!')
-      return
-    if not self.sandbox_created.is_set():
-      log.error('Asked to kill task with incomplete sandbox - aborting runner start')
-      self.runner_aborted.set()
-      return
-    self.log('Activating kill manager.')
-    self._kill_manager.kill(reason)
-
-  @classmethod
-  def translate_exit_state_to_mesos(cls, exit_state):
-    # Put into executor_base?
-    if exit_state == ExitState.FAILED:
-      return mesos_pb.TASK_FAILED
-    elif exit_state == ExitState.KILLED:
-      return mesos_pb.TASK_KILLED
-    elif exit_state == ExitState.FINISHED:
-      return mesos_pb.TASK_FINISHED
-    elif exit_state == ExitState.LOST:
-      return mesos_pb.TASK_LOST
-    log.error('Unknown exit state, defaulting to TASK_FINISHED.')
-    return mesos_pb.TASK_FINISHED
-
-  def _shutdown(self, status_result):
-    runner_status = self._runner.status
-
-    try:
-      deadline(self._runner.stop, timeout=self.STOP_TIMEOUT)
-    except Timeout:
-      log.error('Failed to stop runner within deadline.')
-
-    try:
-      deadline(self._chained_checker.stop, timeout=self.STOP_TIMEOUT)
-    except Timeout:
-      log.error('Failed to stop all checkers within deadline.')
-
-    # If the runner was alive when _shutdown was called, defer to the status_result,
-    # otherwise the runner's terminal state is the preferred state.
-    exit_status = runner_status or status_result
-
-    self.send_update(
-        self._driver,
-        self._task_id,
-        self.translate_exit_state_to_mesos(exit_status.status),
-        status_result.reason)
-
-    self.terminated.set()
-    defer(self._driver.stop, delay=self.PERSISTENCE_WAIT)
-
-  """ Mesos Executor API methods follow """
-
-  def launchTask(self, driver, task):
-    """
-      Invoked when a task has been launched on this executor (initiated via Scheduler::launchTasks).
-      Note that this task can be realized with a thread, a process, or some simple computation,
-      however, no other callbacks will be invoked on this executor until this callback has
returned.
-    """
-    self.launched.set()
-    self.log('launchTask got task: %s:%s' % (task.name, task.task_id.value))
-
-    # TODO(wickman)  Update the tests to call registered(), then remove this line and issue
-    # an assert if self._driver is not populated.
-    self._driver = driver
-
-    if self._runner:
-      log.error('Already running a task! %s' % self._task_id)
-      self.send_update(driver, task.task_id.value, mesos_pb.TASK_LOST,
-          "Task already running on this executor: %s" % self._task_id)
-      return
-
-    self._slave_id = task.slave_id.value
-    self._task_id = task.task_id.value
-
-    try:
-      assigned_task = assigned_task_from_mesos_task(task)
-      mesos_task = mesos_task_instance_from_assigned_task(assigned_task)
-    except Exception as e:
-      log.fatal('Could not deserialize AssignedTask')
-      log.fatal(traceback.format_exc())
-      self.send_update(
-          driver, self._task_id, mesos_pb.TASK_FAILED, "Could not deserialize task: %s" %
e)
-      defer(driver.stop, delay=self.STOP_WAIT)
-      return
-
-    defer(lambda: self._run(driver, assigned_task, mesos_task))
-
-  def killTask(self, driver, task_id):
-    """
-     Invoked when a task running within this executor has been killed (via
-     SchedulerDriver::killTask). Note that no status update will be sent on behalf of the
executor,
-     the executor is responsible for creating a new TaskStatus (i.e., with TASK_KILLED) and
invoking
-     ExecutorDriver::sendStatusUpdate.
-    """
-    self.log('killTask got task_id: %s' % task_id)
-    self._signal_kill_manager(driver, task_id.value, "Instructed to kill task.")
-    self.log('killTask returned.')
-
-  def shutdown(self, driver):
-    """
-     Invoked when the executor should terminate all of its currently running tasks. Note
that after
-     Mesos has determined that an executor has terminated any tasks that the executor did
not send
-     terminal status updates for (e.g., TASK_KILLED, TASK_FINISHED, TASK_FAILED, etc) a TASK_LOST
-     status update will be created.
-
-    """
-    self.log('shutdown called')
-    if self._task_id:
-      self.log('shutting down %s' % self._task_id)
-      self._signal_kill_manager(driver, self._task_id, "Told to shut down executor.")
-    self.log('shutdown returned')

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e6cd6a9e/src/main/python/apache/aurora/executor/thermos_task_runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/thermos_task_runner.py b/src/main/python/apache/aurora/executor/thermos_task_runner.py
index 8e18928..cd9428a 100644
--- a/src/main/python/apache/aurora/executor/thermos_task_runner.py
+++ b/src/main/python/apache/aurora/executor/thermos_task_runner.py
@@ -328,7 +328,10 @@ class DefaultThermosTaskRunnerProvider(TaskRunnerProvider):
   def from_assigned_task(self, assigned_task, sandbox):
     task_id = assigned_task.taskId
     role = assigned_task.task.owner.role
-    mesos_task = mesos_task_instance_from_assigned_task(assigned_task)
+    try:
+      mesos_task = mesos_task_instance_from_assigned_task(assigned_task)
+    except ValueError as e:
+      raise TaskError('Could not deserialize Thermos task from AssignedTask: %s' % e)
     mesos_ports = resolve_ports(mesos_task, assigned_task.assignedPorts)
 
     class ProvidedThermosTaskRunner(self._task_runner_class):

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e6cd6a9e/src/test/python/apache/aurora/executor/BUILD
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/BUILD b/src/test/python/apache/aurora/executor/BUILD
index 3f2387f..b1eb672 100644
--- a/src/test/python/apache/aurora/executor/BUILD
+++ b/src/test/python/apache/aurora/executor/BUILD
@@ -14,19 +14,37 @@
 # limitations under the License.
 #
 
-# TODO(jfarrell): AURORA-14 for executor_builds, gc_executor and thermos_executor
 python_test_suite(name = 'all',
   dependencies = [
-    # pants(':executor_builds'),
+    pants(':executor-small'),
+
+    # TODO(jfarrell): AURORA-14 for executor_builds, gc_executor and thermos_executor
+    # pants(':executor-large'),
+  ]
+)
+
+
+python_test_suite(
+  name = 'executor-small',
+  dependencies = [
     pants(':executor_detector'),
     pants(':executor_vars'),
-    # pants(':gc_executor'),
-    # pants(':thermos_executor'),
     pants(':thermos_task_runner'),
     pants('src/test/python/apache/aurora/executor/common:all'),
+  ],
+)
+
+
+python_test_suite(
+  name = 'executor-large',
+  dependencies = [
+    pants(':executor_builds'),
+    pants(':gc_executor'),
+    pants(':thermos_executor'),
   ]
 )
 
+
 python_tests(name = 'gc_executor',
   sources = ['test_gc_executor.py'],
   dependencies = [
@@ -42,7 +60,6 @@ python_tests(name = 'gc_executor',
     pants('src/main/python/apache/aurora/executor:gc_executor'),
     pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
   ],
-  coverage = 'twitter.mesos.executor.gc_executor'
 )
 
 python_tests(name = 'executor_detector',
@@ -69,7 +86,7 @@ python_tests(name = 'thermos_executor',
     pants('src/main/python/apache/aurora/executor/common:health_checker'),
     pants('src/main/python/apache/aurora/executor/common:sandbox'),
     pants('src/main/python/apache/aurora/executor/common:task_runner'),
-    pants('src/main/python/apache/aurora/executor:thermos_executor'),
+    pants('src/main/python/apache/aurora/executor:aurora_executor'),
     pants('src/main/python/apache/aurora/executor:thermos_task_runner'),
     pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
   ]

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e6cd6a9e/src/test/python/apache/aurora/executor/test_thermos_executor.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/test_thermos_executor.py b/src/test/python/apache/aurora/executor/test_thermos_executor.py
index 3810ea3..b508eab 100644
--- a/src/test/python/apache/aurora/executor/test_thermos_executor.py
+++ b/src/test/python/apache/aurora/executor/test_thermos_executor.py
@@ -42,7 +42,7 @@ from apache.aurora.executor.thermos_task_runner import (
     DefaultThermosTaskRunnerProvider,
     ThermosTaskRunner,
 )
-from apache.aurora.executor.thermos_executor import ThermosExecutor
+from apache.aurora.executor.aurora_executor import AuroraExecutor
 from apache.thermos.common.path import TaskPath
 from apache.thermos.core.runner import TaskRunner
 from apache.thermos.monitoring.monitor import TaskMonitor
@@ -71,7 +71,7 @@ if 'THERMOS_DEBUG' in os.environ:
   log.init('executor_logger')
 
 
-class FastThermosExecutor(ThermosExecutor):
+class FastThermosExecutor(AuroraExecutor):
   STOP_WAIT = Amount(0, Time.SECONDS)
 
 
@@ -132,6 +132,9 @@ class ProxyDriver(object):
     self.method_calls['stop'].append((args, kw))
     self._stop_event.set()
 
+  def wait_stopped(self):
+    return self._stop_event.wait()
+
 
 def make_task(thermos_config, assigned_ports={}, **kw):
   role = getpass.getuser()
@@ -281,7 +284,7 @@ class TestThermosExecutor(object):
     proxy_driver = ProxyDriver()
 
     with temporary_dir() as tempdir:
-      te = ThermosExecutor(
+      te = AuroraExecutor(
           runner_provider=make_provider(tempdir),
           sandbox_provider=DefaultTestSandboxProvider)
       te.launchTask(proxy_driver, make_task(HELLO_WORLD_MTI))
@@ -303,7 +306,7 @@ class TestThermosExecutor(object):
     proxy_driver = ProxyDriver()
 
     with temporary_dir() as tempdir:
-      te = ThermosExecutor(
+      te = AuroraExecutor(
           runner_provider=make_provider(tempdir),
           sandbox_provider=DefaultTestSandboxProvider)
       te.launchTask(proxy_driver, make_task(MESOS_JOB(task=HELLO_WORLD), instanceId=0))
@@ -432,9 +435,7 @@ class TestThermosExecutor(object):
           runner_provider=runner_provider,
           sandbox_provider=DefaultTestSandboxProvider)
       te.launchTask(proxy_driver, make_task(HELLO_WORLD_MTI))
-
-      proxy_driver._stop_event.wait(timeout=1.0)
-      assert proxy_driver._stop_event.is_set()
+      proxy_driver.wait_stopped()
 
       updates = proxy_driver.method_calls['sendStatusUpdate']
       assert updates[-1][0][0].state == mesos_pb.TASK_FAILED
@@ -447,9 +448,7 @@ class TestThermosExecutor(object):
           runner_provider=make_provider(td),
           sandbox_provider=FailingSandboxProvider)
       te.launchTask(proxy_driver, make_task(HELLO_WORLD_MTI))
-
-      proxy_driver._stop_event.wait(timeout=1.0)
-      assert proxy_driver._stop_event.is_set()
+      proxy_driver.wait_stopped()
 
       updates = proxy_driver.method_calls['sendStatusUpdate']
       assert updates[-1][0][0].state == mesos_pb.TASK_FAILED
@@ -465,9 +464,7 @@ class TestThermosExecutor(object):
           sandbox_provider=SlowSandboxProvider)
       te.SANDBOX_INITIALIZATION_TIMEOUT = Amount(1, Time.MILLISECONDS)
       te.launchTask(proxy_driver, task)
-
-      proxy_driver._stop_event.wait(timeout=1.0)
-      assert proxy_driver._stop_event.is_set()
+      proxy_driver.wait_stopped()
 
       updates = proxy_driver.method_calls['sendStatusUpdate']
       assert len(updates) == 2
@@ -497,12 +494,11 @@ class TestThermosExecutor(object):
       # however, wait on the runner to definitely finish its initialization before continuing
       # (otherwise, this function races ahead too fast)
       te._sandbox._init_done.wait()
-      te.sandbox_created.wait(1.0)
+      te.sandbox_created.wait()
       assert te.sandbox_initialized.is_set()
       assert te.sandbox_created.is_set()
 
-      proxy_driver._stop_event.wait(timeout=1.0)
-      assert proxy_driver._stop_event.is_set()
+      proxy_driver.wait_stopped()
 
       updates = proxy_driver.method_calls['sendStatusUpdate']
       assert len(updates) == 2
@@ -511,28 +507,32 @@ class TestThermosExecutor(object):
   def test_launchTask_deserialization_fail(self):
     proxy_driver = ProxyDriver()
 
+    role = getpass.getuser()
     task_info = mesos_pb.TaskInfo()
     task_info.name = task_info.task_id.value = 'broken'
-    task_info.data = serialize(AssignedTask(task=TaskConfig(executorConfig=ExecutorConfig(
-        name=AURORA_EXECUTOR_NAME,
-        data='garbage'))))
+    task_info.data = serialize(
+        AssignedTask(
+            task=TaskConfig(
+                owner=Identity(role=role, user=role),
+                executorConfig=ExecutorConfig(name=AURORA_EXECUTOR_NAME, data='garbage'))))
 
-    te = ThermosExecutor(
+    te = FastThermosExecutor(
         runner_provider=make_provider(safe_mkdtemp()),
         sandbox_provider=DefaultTestSandboxProvider)
     te.launchTask(proxy_driver, task_info)
+    proxy_driver.wait_stopped()
 
     updates = proxy_driver.method_calls['sendStatusUpdate']
-    assert len(updates) == 1
-    assert updates[0][0][0].state == mesos_pb.TASK_FAILED
+    assert len(updates) == 2
+    assert updates[0][0][0].state == mesos_pb.TASK_STARTING
+    assert updates[1][0][0].state == mesos_pb.TASK_FAILED
 
 
 def test_waiting_executor():
   proxy_driver = ProxyDriver()
   with temporary_dir() as checkpoint_root:
-    te = ThermosExecutor(
+    te = AuroraExecutor(
         runner_provider=make_provider(checkpoint_root),
         sandbox_provider=DefaultTestSandboxProvider)
     ExecutorTimeout(te.launched, proxy_driver, timeout=Amount(100, Time.MILLISECONDS)).start()
-    proxy_driver._stop_event.wait(timeout=1.0)
-    assert proxy_driver._stop_event.is_set()
+    proxy_driver.wait_stopped()

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e6cd6a9e/src/test/python/apache/thermos/common/BUILD
----------------------------------------------------------------------
diff --git a/src/test/python/apache/thermos/common/BUILD b/src/test/python/apache/thermos/common/BUILD
index 7bbd826..14a55a0 100644
--- a/src/test/python/apache/thermos/common/BUILD
+++ b/src/test/python/apache/thermos/common/BUILD
@@ -27,7 +27,6 @@ python_tests(name = 'test_pathspec',
   dependencies = [
     pants('src/main/python/apache/thermos/common:path'),
   ],
-  coverage = 'apache.thermos.base.path'
 )
 
 python_tests(name = 'test_planner',
@@ -35,7 +34,6 @@ python_tests(name = 'test_planner',
   dependencies = [
     pants('src/main/python/apache/thermos/common:planner'),
   ],
-  coverage = 'apache.thermos.base.planner'
 )
 
 python_tests(name = 'test_task_planner',


Mime
View raw message