Return-Path: X-Original-To: apmail-aurora-commits-archive@minotaur.apache.org Delivered-To: apmail-aurora-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0697C11D4C for ; Fri, 21 Feb 2014 19:47:41 +0000 (UTC) Received: (qmail 40878 invoked by uid 500); 21 Feb 2014 19:47:40 -0000 Delivered-To: apmail-aurora-commits-archive@aurora.apache.org Received: (qmail 40864 invoked by uid 500); 21 Feb 2014 19:47:39 -0000 Mailing-List: contact commits-help@aurora.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aurora.incubator.apache.org Delivered-To: mailing list commits@aurora.incubator.apache.org Received: (qmail 40857 invoked by uid 99); 21 Feb 2014 19:47:39 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Feb 2014 19:47:39 +0000 X-ASF-Spam-Status: No, hits=-2000.6 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 21 Feb 2014 19:47:31 +0000 Received: (qmail 37968 invoked by uid 99); 21 Feb 2014 19:47:08 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Feb 2014 19:47:08 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 8115E82EF79; Fri, 21 Feb 2014 19:47:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wickman@apache.org To: commits@aurora.incubator.apache.org Message-Id: <40cf5d2093f34f079f4493cae1ce4464@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: AURORA-186: rename ThermosExecutor to AuroraExecutor, clean up thermos-isms from ExecutorBase Date: Fri, 21 Feb 2014 19:47:08 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-aurora Updated Branches: refs/heads/master 445ecd778 -> e6cd6a9ea AURORA-186: rename ThermosExecutor to AuroraExecutor, clean up thermos-isms from ExecutorBase split all thermos-specific stuff out of thermos_executor.py Testing Done: this adds executor-small and executor-large, the latter of which runs the executor tests. Both ./pants src/test/python/apache/aurora/executor:executor-large and e2e passes locally for me. Bugs closed: AURORA-186 Reviewed at https://reviews.apache.org/r/18239/ Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/e6cd6a9e Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/e6cd6a9e Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/e6cd6a9e Branch: refs/heads/master Commit: e6cd6a9eaa2f0a9a38ec710765bd4b36df085a09 Parents: 445ecd7 Author: Brian Wickman Authored: Fri Feb 21 11:46:55 2014 -0800 Committer: Brian Wickman Committed: Fri Feb 21 11:46:55 2014 -0800 ---------------------------------------------------------------------- src/main/python/apache/aurora/executor/BUILD | 13 +- .../python/apache/aurora/executor/__init__.py | 2 +- .../apache/aurora/executor/aurora_executor.py | 301 ++++++++++++++++++ .../python/apache/aurora/executor/bin/BUILD | 2 +- .../executor/bin/thermos_executor_main.py | 8 +- .../apache/aurora/executor/executor_base.py | 51 +--- .../apache/aurora/executor/gc_executor.py | 36 ++- .../apache/aurora/executor/thermos_executor.py | 304 ------------------- .../aurora/executor/thermos_task_runner.py | 5 +- src/test/python/apache/aurora/executor/BUILD | 29 +- .../aurora/executor/test_thermos_executor.py | 50 +-- src/test/python/apache/thermos/common/BUILD | 2 - 12 files changed, 403 insertions(+), 400 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e6cd6a9e/src/main/python/apache/aurora/executor/BUILD ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/BUILD b/src/main/python/apache/aurora/executor/BUILD index 667975b..9f0cad0 100644 --- a/src/main/python/apache/aurora/executor/BUILD +++ b/src/main/python/apache/aurora/executor/BUILD @@ -67,22 +67,20 @@ python_library( ) python_library( - name = 'thermos_executor_base', + name = 'executor_base', sources = ['executor_base.py'], dependencies = [ pants('3rdparty/python:mesos'), pants('3rdparty/python:twitter.common.log'), - pants('src/main/thrift/org/apache/thermos:py-thrift'), - pants('src/main/thrift/org/apache/aurora/gen:py-thrift'), ] ) python_library( - name = 'thermos_executor', - sources = ['thermos_executor.py'], + name = 'aurora_executor', + sources = ['aurora_executor.py'], dependencies = [ pants(':status_manager'), - pants(':thermos_executor_base'), + pants(':executor_base'), pants('3rdparty/python:mesos'), pants('3rdparty/python:pystachio'), pants('3rdparty/python:twitter.common.app'), @@ -113,7 +111,7 @@ python_library( sources = ['gc_executor.py'], dependencies = [ pants(':executor_detector'), - pants(':thermos_executor_base'), + pants(':executor_base'), pants('3rdparty/python:mesos'), pants('3rdparty/python:psutil'), pants('3rdparty/python:twitter.common.collections'), @@ -130,6 +128,7 @@ python_library( pants('src/main/python/apache/aurora/config:schema'), pants('src/main/python/apache/aurora/executor/common:sandbox'), pants('src/main/thrift/org/apache/aurora/gen:py-thrift'), + pants('src/main/thrift/org/apache/thermos:py-thrift'), ] ) http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e6cd6a9e/src/main/python/apache/aurora/executor/__init__.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/__init__.py b/src/main/python/apache/aurora/executor/__init__.py index d54d88c..6cfe8d6 100644 --- a/src/main/python/apache/aurora/executor/__init__.py +++ b/src/main/python/apache/aurora/executor/__init__.py @@ -14,4 +14,4 @@ # limitations under the License. # -__import__('pkg_resources').declare_namespace(__name__) \ No newline at end of file +__import__('pkg_resources').declare_namespace(__name__) http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e6cd6a9e/src/main/python/apache/aurora/executor/aurora_executor.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/aurora_executor.py b/src/main/python/apache/aurora/executor/aurora_executor.py new file mode 100644 index 0000000..28e5054 --- /dev/null +++ b/src/main/python/apache/aurora/executor/aurora_executor.py @@ -0,0 +1,301 @@ +# +# Copyright 2013 Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import threading +import time +import traceback + +from .common.kill_manager import KillManager +from .common.sandbox import DirectorySandbox, SandboxProvider +from .common.status_checker import ChainedStatusChecker, ExitState +from .common.task_info import assigned_task_from_mesos_task +from .common.task_runner import ( + TaskError, + TaskRunner, + TaskRunnerProvider, +) +from .executor_base import ExecutorBase +from .status_manager import StatusManager + +import mesos_pb2 as mesos_pb +from twitter.common import log +from twitter.common.concurrent import deadline, defer, Timeout +from twitter.common.metrics import Observable +from twitter.common.quantity import Amount, Time + + +class DefaultSandboxProvider(SandboxProvider): + SANDBOX_NAME = 'sandbox' + + def from_assigned_task(self, assigned_task): + return DirectorySandbox( + os.path.realpath(self.SANDBOX_NAME), + assigned_task.task.owner.role) + + +class AuroraExecutor(ExecutorBase, Observable): + PERSISTENCE_WAIT = Amount(5, Time.SECONDS) + SANDBOX_INITIALIZATION_TIMEOUT = Amount(10, Time.MINUTES) + START_TIMEOUT = Amount(2, Time.MINUTES) + STOP_TIMEOUT = Amount(2, Time.MINUTES) + STOP_WAIT = Amount(5, Time.SECONDS) + + def __init__(self, + runner_provider, + status_manager_class=StatusManager, + sandbox_provider=DefaultSandboxProvider, + status_providers=(), + clock=time): + + ExecutorBase.__init__(self) + if not isinstance(runner_provider, TaskRunnerProvider): + raise TypeError('runner_provider must be a TaskRunnerProvider, got %s' % + type(runner_provider)) + self._runner = None + self._runner_provider = runner_provider + self._clock = clock + self._task_id = None + self._status_providers = status_providers + self._status_manager = None + self._status_manager_class = status_manager_class + self._sandbox = None + self._sandbox_provider = sandbox_provider() + self._kill_manager = KillManager() + # Events that are exposed for interested entities + self.runner_aborted = threading.Event() + self.runner_started = threading.Event() + self.sandbox_initialized = threading.Event() + self.sandbox_created = threading.Event() + self.terminated = threading.Event() + self.launched = threading.Event() + + @property + def runner(self): + return self._runner + + def _die(self, driver, status, msg): + log.fatal(msg) + self.send_update(driver, self._task_id, status, msg) + defer(driver.stop, delay=self.STOP_WAIT) + + def _run(self, driver, assigned_task): + """ + Commence running a Task. + - Initialize the sandbox + - Start the ThermosTaskRunner (fork the Thermos TaskRunner) + - Set up necessary HealthCheckers + - Set up StatusManager, and attach HealthCheckers + """ + self.send_update(driver, self._task_id, mesos_pb.TASK_STARTING, 'Initializing sandbox.') + + if not self._initialize_sandbox(driver, assigned_task): + return + + # start the process on a separate thread and give the message processing thread back + # to the driver + try: + self._runner = self._runner_provider.from_assigned_task(assigned_task, self._sandbox) + except TaskError as e: + self.runner_aborted.set() + self._die(driver, mesos_pb.TASK_FAILED, str(e)) + return + + if not isinstance(self._runner, TaskRunner): + self._die(driver, mesos_pb.TASK_FAILED, 'Unrecognized task!') + return + + if not self._start_runner(driver, assigned_task): + return + + self.send_update(driver, self._task_id, mesos_pb.TASK_RUNNING) + + self._start_status_manager(driver, assigned_task) + + def _initialize_sandbox(self, driver, assigned_task): + self._sandbox = self._sandbox_provider.from_assigned_task(assigned_task) + self.sandbox_initialized.set() + try: + deadline(self._sandbox.create, timeout=self.SANDBOX_INITIALIZATION_TIMEOUT, + daemon=True, propagate=True) + except Timeout: + self._die(driver, mesos_pb.TASK_FAILED, 'Timed out waiting for sandbox to initialize!') + return + except self._sandbox.Error as e: + self._die(driver, mesos_pb.TASK_FAILED, 'Failed to initialize sandbox: %s' % e) + return + self.sandbox_created.set() + return True + + def _start_runner(self, driver, assigned_task): + if self.runner_aborted.is_set(): + self._die(driver, mesos_pb.TASK_KILLED, 'Task killed during initialization.') + + try: + deadline(self._runner.start, timeout=self.START_TIMEOUT, propagate=True) + except TaskError as e: + self._die(driver, mesos_pb.TASK_FAILED, 'Task initialization failed: %s' % e) + return False + except Timeout: + self._die(driver, mesos_pb.TASK_LOST, 'Timed out waiting for task to start!') + return False + + self.runner_started.set() + log.debug('Task started.') + + return True + + def _start_status_manager(self, driver, assigned_task): + status_checkers = [self._kill_manager] + self.metrics.register_observable('kill_manager', self._kill_manager) + + for status_provider in self._status_providers: + status_checker = status_provider.from_assigned_task(assigned_task, self._sandbox) + if status_checker is None: + continue + status_checkers.append(status_checker) + # self.metrics.register_observable() + + self._chained_checker = ChainedStatusChecker(status_checkers) + self._chained_checker.start() + + # chain the runner to the other checkers, but do not chain .start()/.stop() + complete_checker = ChainedStatusChecker([self._runner, self._chained_checker]) + self._status_manager = self._status_manager_class( + complete_checker, self._shutdown, clock=self._clock) + self._status_manager.start() + + def _signal_kill_manager(self, driver, task_id, reason): + if self._task_id is None: + log.error('Was asked to kill task but no task running!') + return + if task_id != self._task_id: + log.error('Asked to kill a task other than what we are running!') + return + if not self.sandbox_created.is_set(): + log.error('Asked to kill task with incomplete sandbox - aborting runner start') + self.runner_aborted.set() + return + self.log('Activating kill manager.') + self._kill_manager.kill(reason) + + @classmethod + def translate_exit_state_to_mesos(cls, exit_state): + # Put into executor_base? + if exit_state == ExitState.FAILED: + return mesos_pb.TASK_FAILED + elif exit_state == ExitState.KILLED: + return mesos_pb.TASK_KILLED + elif exit_state == ExitState.FINISHED: + return mesos_pb.TASK_FINISHED + elif exit_state == ExitState.LOST: + return mesos_pb.TASK_LOST + log.error('Unknown exit state, defaulting to TASK_FINISHED.') + return mesos_pb.TASK_FINISHED + + def _shutdown(self, status_result): + runner_status = self._runner.status + + try: + deadline(self._runner.stop, timeout=self.STOP_TIMEOUT) + except Timeout: + log.error('Failed to stop runner within deadline.') + + try: + deadline(self._chained_checker.stop, timeout=self.STOP_TIMEOUT) + except Timeout: + log.error('Failed to stop all checkers within deadline.') + + # If the runner was alive when _shutdown was called, defer to the status_result, + # otherwise the runner's terminal state is the preferred state. + exit_status = runner_status or status_result + + self.send_update( + self._driver, + self._task_id, + self.translate_exit_state_to_mesos(exit_status.status), + status_result.reason) + + self.terminated.set() + defer(self._driver.stop, delay=self.PERSISTENCE_WAIT) + + @classmethod + def validate_task(cls, task): + try: + assigned_task = assigned_task_from_mesos_task(task) + return assigned_task + except Exception as e: + log.fatal('Could not deserialize AssignedTask') + log.fatal(traceback.format_exc()) + return None + + """ Mesos Executor API methods follow """ + + def launchTask(self, driver, task): + """ + Invoked when a task has been launched on this executor (initiated via Scheduler::launchTasks). + Note that this task can be realized with a thread, a process, or some simple computation, + however, no other callbacks will be invoked on this executor until this callback has returned. + """ + self.launched.set() + self.log('launchTask got task: %s:%s' % (task.name, task.task_id.value)) + + # TODO(wickman) Update the tests to call registered(), then remove this line and issue + # an assert if self._driver is not populated. + self._driver = driver + + if self._runner: + log.error('Already running a task! %s' % self._task_id) + self.send_update(driver, task.task_id.value, mesos_pb.TASK_LOST, + "Task already running on this executor: %s" % self._task_id) + return + + self._slave_id = task.slave_id.value + self._task_id = task.task_id.value + + assigned_task = self.validate_task(task) + if not assigned_task: + self.send_update(driver, self._task_id, mesos_pb.TASK_FAILED, + 'Could not deserialize task.') + defer(driver.stop, delay=self.STOP_WAIT) + return + + defer(lambda: self._run(driver, assigned_task)) + + def killTask(self, driver, task_id): + """ + Invoked when a task running within this executor has been killed (via + SchedulerDriver::killTask). Note that no status update will be sent on behalf of the executor, + the executor is responsible for creating a new TaskStatus (i.e., with TASK_KILLED) and invoking + ExecutorDriver::sendStatusUpdate. + """ + self.log('killTask got task_id: %s' % task_id) + self._signal_kill_manager(driver, task_id.value, "Instructed to kill task.") + self.log('killTask returned.') + + def shutdown(self, driver): + """ + Invoked when the executor should terminate all of its currently running tasks. Note that after + Mesos has determined that an executor has terminated any tasks that the executor did not send + terminal status updates for (e.g., TASK_KILLED, TASK_FINISHED, TASK_FAILED, etc) a TASK_LOST + status update will be created. + + """ + self.log('shutdown called') + if self._task_id: + self.log('shutting down %s' % self._task_id) + self._signal_kill_manager(driver, self._task_id, "Told to shut down executor.") + self.log('shutdown returned') http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e6cd6a9e/src/main/python/apache/aurora/executor/bin/BUILD ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/bin/BUILD b/src/main/python/apache/aurora/executor/bin/BUILD index 2dcc69b..6d5330b 100644 --- a/src/main/python/apache/aurora/executor/bin/BUILD +++ b/src/main/python/apache/aurora/executor/bin/BUILD @@ -29,7 +29,7 @@ python_binary( pants('src/main/python/apache/aurora/executor/common:sandbox'), pants('src/main/python/apache/aurora/executor:executor_detector'), pants('src/main/python/apache/aurora/executor:executor_vars'), - pants('src/main/python/apache/aurora/executor:thermos_executor'), + pants('src/main/python/apache/aurora/executor:aurora_executor'), pants('src/main/python/apache/aurora/executor:thermos_task_runner'), ] ) http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e6cd6a9e/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py index 8968e89..806dc91 100644 --- a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py +++ b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py @@ -25,7 +25,7 @@ import os from apache.aurora.executor.common.executor_timeout import ExecutorTimeout from apache.aurora.executor.common.health_checker import HealthCheckerProvider -from apache.aurora.executor.thermos_executor import ThermosExecutor +from apache.aurora.executor.aurora_executor import AuroraExecutor from apache.aurora.executor.thermos_task_runner import DefaultThermosTaskRunnerProvider import mesos @@ -56,14 +56,14 @@ def dump_runner_pex(): def proxy_main(): def main(): - runner_provider = DefaultThermosTaskRunnerProvider( + thermos_runner_provider = DefaultThermosTaskRunnerProvider( dump_runner_pex(), artifact_dir=os.path.realpath('.'), ) # Create executor stub - thermos_executor = ThermosExecutor( - runner_provider=runner_provider, + thermos_executor = AuroraExecutor( + runner_provider=thermos_runner_provider, status_providers=(HealthCheckerProvider(),), ) http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e6cd6a9e/src/main/python/apache/aurora/executor/executor_base.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/executor_base.py b/src/main/python/apache/aurora/executor/executor_base.py index a1310cc..51309aa 100644 --- a/src/main/python/apache/aurora/executor/executor_base.py +++ b/src/main/python/apache/aurora/executor/executor_base.py @@ -14,18 +14,14 @@ # limitations under the License. # -from gen.apache.aurora.constants import TERMINAL_STATES as AURORA_TERMINAL_STATES -from gen.apache.aurora.ttypes import ScheduleStatus -from gen.apache.thermos.ttypes import TaskState - import mesos import mesos_pb2 as mesos_pb from twitter.common import log -class ThermosExecutorBase(mesos.Executor): +class ExecutorBase(mesos.Executor): # Statuses are hard, let's go shopping. - MESOS_STATES = { + STATES_TO_STR = { mesos_pb.TASK_STARTING: 'STARTING', mesos_pb.TASK_RUNNING: 'RUNNING', mesos_pb.TASK_FINISHED: 'FINISHED', @@ -34,45 +30,16 @@ class ThermosExecutorBase(mesos.Executor): mesos_pb.TASK_LOST: 'LOST', } - THERMOS_TO_MESOS_STATES = { - TaskState.ACTIVE: mesos_pb.TASK_RUNNING, - TaskState.SUCCESS: mesos_pb.TASK_FINISHED, - TaskState.FAILED: mesos_pb.TASK_FAILED, - TaskState.KILLED: mesos_pb.TASK_KILLED, - TaskState.LOST: mesos_pb.TASK_LOST, - } - - THERMOS_TO_TWITTER_STATES = { - TaskState.ACTIVE: ScheduleStatus.RUNNING, - TaskState.CLEANING: ScheduleStatus.RUNNING, - TaskState.FINALIZING: ScheduleStatus.RUNNING, - TaskState.SUCCESS: ScheduleStatus.FINISHED, - TaskState.FAILED: ScheduleStatus.FAILED, - TaskState.KILLED: ScheduleStatus.KILLED, - TaskState.LOST: ScheduleStatus.LOST, - } - - @staticmethod - def twitter_status_is_terminal(status): - return status in AURORA_TERMINAL_STATES - - @staticmethod - def mesos_status_is_terminal(status): - return status in ( + TERMINAL_STATES = frozenset([ mesos_pb.TASK_FAILED, mesos_pb.TASK_FINISHED, mesos_pb.TASK_KILLED, mesos_pb.TASK_LOST, - ) + ]) - @staticmethod - def thermos_status_is_terminal(status): - return status in ( - TaskState.FAILED, - TaskState.KILLED, - TaskState.LOST, - TaskState.SUCCESS, - ) + @classmethod + def status_is_terminal(cls, status): + return status in cls.TERMINAL_STATES def __init__(self): self._slave_id = None @@ -101,13 +68,13 @@ class ThermosExecutorBase(mesos.Executor): update = mesos_pb.TaskStatus() if not isinstance(state, int): raise TypeError('Invalid state type %s, should be int.' % type(state)) - if state not in self.MESOS_STATES: + if state not in self.STATES_TO_STR: raise ValueError('Invalid state: %s' % state) update.state = state update.task_id.value = task_id if message: update.message = str(message) - self.log('Updating %s => %s' % (task_id, self.MESOS_STATES[state])) + self.log('Updating %s => %s' % (task_id, self.STATES_TO_STR[state])) self.log(' Reason: %s' % message) driver.sendStatusUpdate(update) http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e6cd6a9e/src/main/python/apache/aurora/executor/gc_executor.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/gc_executor.py b/src/main/python/apache/aurora/executor/gc_executor.py index b8b5026..4a866b2 100644 --- a/src/main/python/apache/aurora/executor/gc_executor.py +++ b/src/main/python/apache/aurora/executor/gc_executor.py @@ -37,10 +37,12 @@ from gen.apache.aurora.comm.ttypes import ( AdjustRetainedTasks, DeletedTasks, SchedulerMessage) +from gen.apache.aurora.constants import TERMINAL_STATES from gen.apache.aurora.ttypes import ScheduleStatus +from gen.apache.thermos.ttypes import TaskState from .common.sandbox import DirectorySandbox, SandboxInterface -from .executor_base import ThermosExecutorBase +from .executor_base import ExecutorBase from .executor_detector import ExecutorDetector import mesos_pb2 as mesos_pb @@ -54,7 +56,27 @@ from twitter.common.metrics.gauge import AtomicGauge from twitter.common.quantity import Amount, Time -class ThermosGCExecutor(ThermosExecutorBase, ExceptionalThread, Observable): +THERMOS_TO_TWITTER_STATES = { + TaskState.ACTIVE: ScheduleStatus.RUNNING, + TaskState.CLEANING: ScheduleStatus.RUNNING, + TaskState.FINALIZING: ScheduleStatus.RUNNING, + TaskState.SUCCESS: ScheduleStatus.FINISHED, + TaskState.FAILED: ScheduleStatus.FAILED, + TaskState.KILLED: ScheduleStatus.KILLED, + TaskState.LOST: ScheduleStatus.LOST, +} + + +THERMOS_TO_MESOS_STATES = { + TaskState.ACTIVE: mesos_pb.TASK_RUNNING, + TaskState.SUCCESS: mesos_pb.TASK_FINISHED, + TaskState.FAILED: mesos_pb.TASK_FAILED, + TaskState.KILLED: mesos_pb.TASK_KILLED, + TaskState.LOST: mesos_pb.TASK_LOST, +} + + +class ThermosGCExecutor(ExecutorBase, ExceptionalThread, Observable): """ Thermos GC Executor, responsible for: - garbage collecting old tasks to make sure they don't clutter up the system @@ -89,7 +111,7 @@ class ThermosGCExecutor(ThermosExecutorBase, ExceptionalThread, Observable): executor_detector=ExecutorDetector, task_garbage_collector=TaskGarbageCollector, clock=time): - ThermosExecutorBase.__init__(self) + ExecutorBase.__init__(self) ExceptionalThread.__init__(self) self.daemon = True self._stop_event = threading.Event() @@ -97,7 +119,7 @@ class ThermosGCExecutor(ThermosExecutorBase, ExceptionalThread, Observable): # the order in which they were received via a launchTask. self._driver = None # cache the ExecutorDriver provided by the slave, so we can use it # out of band from slave-initiated callbacks. This should be supplied by - # ThermosExecutorBase.registered() when the executor first registers with + # ExecutorBase.registered() when the executor first registers with # the slave. self._slave_id = None # cache the slave ID provided by the slave self._task_id = None # the task_id currently being executed by the ThermosGCExecutor, if any @@ -220,7 +242,7 @@ class ThermosGCExecutor(ThermosExecutorBase, ExceptionalThread, Observable): def partition(rt): active, starting, finished = set(), set(), set() for task_id, schedule_status in rt.items(): - if self.twitter_status_is_terminal(schedule_status): + if schedule_status in TERMINAL_STATES: finished.add(task_id) elif (schedule_status == ScheduleStatus.STARTING or schedule_status == ScheduleStatus.ASSIGNED): @@ -263,11 +285,11 @@ class ThermosGCExecutor(ThermosExecutorBase, ExceptionalThread, Observable): states = self.get_states(task_id) if states: _, last_state = states[-1] - updates[task_id] = self.THERMOS_TO_TWITTER_STATES.get(last_state, ScheduleStatus.UNKNOWN) + updates[task_id] = THERMOS_TO_TWITTER_STATES.get(last_state, ScheduleStatus.UNKNOWN) self.send_update( driver, task_id, - self.THERMOS_TO_MESOS_STATES.get(last_state, mesos_pb.TASK_LOST), + THERMOS_TO_MESOS_STATES.get(last_state, mesos_pb.TASK_LOST), 'Task finish detected by GC executor.') else: local_gc.update(self.should_gc_task(task_id)) http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e6cd6a9e/src/main/python/apache/aurora/executor/thermos_executor.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/thermos_executor.py b/src/main/python/apache/aurora/executor/thermos_executor.py deleted file mode 100644 index 252ba5f..0000000 --- a/src/main/python/apache/aurora/executor/thermos_executor.py +++ /dev/null @@ -1,304 +0,0 @@ -# -# Copyright 2013 Apache Software Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import os -import threading -import time -import traceback - -from .common.kill_manager import KillManager -from .common.sandbox import DirectorySandbox, SandboxProvider -from .common.status_checker import ChainedStatusChecker, ExitState -from .common.task_info import ( - assigned_task_from_mesos_task, - mesos_task_instance_from_assigned_task, - resolve_ports, -) -from .common.task_runner import ( - TaskError, - TaskRunner, - TaskRunnerProvider, -) -from .executor_base import ThermosExecutorBase -from .status_manager import StatusManager - -import mesos_pb2 as mesos_pb -from twitter.common import log -from twitter.common.concurrent import deadline, defer, Timeout -from twitter.common.metrics import Observable -from twitter.common.quantity import Amount, Time - - -class DefaultSandboxProvider(SandboxProvider): - SANDBOX_NAME = 'sandbox' - - def from_assigned_task(self, assigned_task): - mesos_task = mesos_task_instance_from_assigned_task(assigned_task) - return DirectorySandbox( - os.path.realpath(self.SANDBOX_NAME), - mesos_task.role().get()) - - -class ThermosExecutor(Observable, ThermosExecutorBase): - PERSISTENCE_WAIT = Amount(5, Time.SECONDS) - SANDBOX_INITIALIZATION_TIMEOUT = Amount(10, Time.MINUTES) - START_TIMEOUT = Amount(2, Time.MINUTES) - STOP_TIMEOUT = Amount(2, Time.MINUTES) - STOP_WAIT = Amount(5, Time.SECONDS) - - def __init__(self, - runner_provider, - status_manager_class=StatusManager, - sandbox_provider=DefaultSandboxProvider, - status_providers=(), - clock=time): - - ThermosExecutorBase.__init__(self) - if not isinstance(runner_provider, TaskRunnerProvider): - raise TypeError('runner_provider must be a TaskRunnerProvider, got %s' % - type(runner_provider)) - self._runner = None - self._runner_provider = runner_provider - self._clock = clock - self._task_id = None - self._status_providers = status_providers - self._status_manager = None - self._status_manager_class = status_manager_class - self._sandbox = None - self._sandbox_provider = sandbox_provider() - self._kill_manager = KillManager() - # Events that are exposed for interested entities - self.runner_aborted = threading.Event() - self.runner_started = threading.Event() - self.sandbox_initialized = threading.Event() - self.sandbox_created = threading.Event() - self.terminated = threading.Event() - self.launched = threading.Event() - - @property - def runner(self): - return self._runner - - def _die(self, driver, status, msg): - log.fatal(msg) - self.send_update(driver, self._task_id, status, msg) - defer(driver.stop, delay=self.STOP_WAIT) - - def _run(self, driver, assigned_task, mesos_task): - """ - Commence running a Task. - - Initialize the sandbox - - Start the ThermosTaskRunner (fork the Thermos TaskRunner) - - Set up necessary HealthCheckers - - Set up DiscoveryManager, if applicable - - Set up ResourceCheckpointer - - Set up StatusManager, and attach HealthCheckers - """ - self.send_update(driver, self._task_id, mesos_pb.TASK_STARTING, 'Initializing sandbox.') - - if not self._initialize_sandbox(driver, assigned_task): - return - - # Fully resolve the portmap - portmap = resolve_ports(mesos_task, assigned_task.assignedPorts) - - # start the process on a separate thread and give the message processing thread back - # to the driver - try: - self._runner = self._runner_provider.from_assigned_task(assigned_task, self._sandbox) - except TaskError as e: - self._die(driver, mesos_pb.TASK_FAILED, str(e)) - return - - if not isinstance(self._runner, TaskRunner): - self._die(driver, mesos_pb.TASK_FAILED, 'Unrecognized task!') - return - - if not self._start_runner(driver, assigned_task, mesos_task, portmap): - return - - self.send_update(driver, self._task_id, mesos_pb.TASK_RUNNING) - - self._start_status_manager(driver, assigned_task, mesos_task, portmap) - - def _initialize_sandbox(self, driver, assigned_task): - self._sandbox = self._sandbox_provider.from_assigned_task(assigned_task) - self.sandbox_initialized.set() - try: - deadline(self._sandbox.create, timeout=self.SANDBOX_INITIALIZATION_TIMEOUT, - daemon=True, propagate=True) - except Timeout: - self._die(driver, mesos_pb.TASK_FAILED, 'Timed out waiting for sandbox to initialize!') - return - except self._sandbox.Error as e: - self._die(driver, mesos_pb.TASK_FAILED, 'Failed to initialize sandbox: %s' % e) - return - self.sandbox_created.set() - return True - - def _start_runner(self, driver, assigned_task, mesos_task, portmap): - if self.runner_aborted.is_set(): - self._die(driver, mesos_pb.TASK_KILLED, 'Task killed during initialization.') - - try: - deadline(self._runner.start, timeout=self.START_TIMEOUT, propagate=True) - except TaskError as e: - self._die(driver, mesos_pb.TASK_FAILED, 'Task initialization failed: %s' % e) - return False - except Timeout: - self._die(driver, mesos_pb.TASK_LOST, 'Timed out waiting for task to start!') - return False - - self.runner_started.set() - log.debug('Task started.') - - return True - - def _start_status_manager(self, driver, assigned_task, mesos_task, portmap): - status_checkers = [self._kill_manager] - self.metrics.register_observable('kill_manager', self._kill_manager) - - for status_provider in self._status_providers: - status_checker = status_provider.from_assigned_task(assigned_task, self._sandbox) - if status_checker is None: - continue - status_checkers.append(status_checker) - # self.metrics.register_observable() - - self._chained_checker = ChainedStatusChecker(status_checkers) - self._chained_checker.start() - - # chain the runner to the other checkers, but do not chain .start()/.stop() - complete_checker = ChainedStatusChecker([self._runner, self._chained_checker]) - self._status_manager = self._status_manager_class( - complete_checker, self._shutdown, clock=self._clock) - self._status_manager.start() - - def _signal_kill_manager(self, driver, task_id, reason): - if self._task_id is None: - log.error('Was asked to kill task but no task running!') - return - if task_id != self._task_id: - log.error('Asked to kill a task other than what we are running!') - return - if not self.sandbox_created.is_set(): - log.error('Asked to kill task with incomplete sandbox - aborting runner start') - self.runner_aborted.set() - return - self.log('Activating kill manager.') - self._kill_manager.kill(reason) - - @classmethod - def translate_exit_state_to_mesos(cls, exit_state): - # Put into executor_base? - if exit_state == ExitState.FAILED: - return mesos_pb.TASK_FAILED - elif exit_state == ExitState.KILLED: - return mesos_pb.TASK_KILLED - elif exit_state == ExitState.FINISHED: - return mesos_pb.TASK_FINISHED - elif exit_state == ExitState.LOST: - return mesos_pb.TASK_LOST - log.error('Unknown exit state, defaulting to TASK_FINISHED.') - return mesos_pb.TASK_FINISHED - - def _shutdown(self, status_result): - runner_status = self._runner.status - - try: - deadline(self._runner.stop, timeout=self.STOP_TIMEOUT) - except Timeout: - log.error('Failed to stop runner within deadline.') - - try: - deadline(self._chained_checker.stop, timeout=self.STOP_TIMEOUT) - except Timeout: - log.error('Failed to stop all checkers within deadline.') - - # If the runner was alive when _shutdown was called, defer to the status_result, - # otherwise the runner's terminal state is the preferred state. - exit_status = runner_status or status_result - - self.send_update( - self._driver, - self._task_id, - self.translate_exit_state_to_mesos(exit_status.status), - status_result.reason) - - self.terminated.set() - defer(self._driver.stop, delay=self.PERSISTENCE_WAIT) - - """ Mesos Executor API methods follow """ - - def launchTask(self, driver, task): - """ - Invoked when a task has been launched on this executor (initiated via Scheduler::launchTasks). - Note that this task can be realized with a thread, a process, or some simple computation, - however, no other callbacks will be invoked on this executor until this callback has returned. - """ - self.launched.set() - self.log('launchTask got task: %s:%s' % (task.name, task.task_id.value)) - - # TODO(wickman) Update the tests to call registered(), then remove this line and issue - # an assert if self._driver is not populated. - self._driver = driver - - if self._runner: - log.error('Already running a task! %s' % self._task_id) - self.send_update(driver, task.task_id.value, mesos_pb.TASK_LOST, - "Task already running on this executor: %s" % self._task_id) - return - - self._slave_id = task.slave_id.value - self._task_id = task.task_id.value - - try: - assigned_task = assigned_task_from_mesos_task(task) - mesos_task = mesos_task_instance_from_assigned_task(assigned_task) - except Exception as e: - log.fatal('Could not deserialize AssignedTask') - log.fatal(traceback.format_exc()) - self.send_update( - driver, self._task_id, mesos_pb.TASK_FAILED, "Could not deserialize task: %s" % e) - defer(driver.stop, delay=self.STOP_WAIT) - return - - defer(lambda: self._run(driver, assigned_task, mesos_task)) - - def killTask(self, driver, task_id): - """ - Invoked when a task running within this executor has been killed (via - SchedulerDriver::killTask). Note that no status update will be sent on behalf of the executor, - the executor is responsible for creating a new TaskStatus (i.e., with TASK_KILLED) and invoking - ExecutorDriver::sendStatusUpdate. - """ - self.log('killTask got task_id: %s' % task_id) - self._signal_kill_manager(driver, task_id.value, "Instructed to kill task.") - self.log('killTask returned.') - - def shutdown(self, driver): - """ - Invoked when the executor should terminate all of its currently running tasks. Note that after - Mesos has determined that an executor has terminated any tasks that the executor did not send - terminal status updates for (e.g., TASK_KILLED, TASK_FINISHED, TASK_FAILED, etc) a TASK_LOST - status update will be created. - - """ - self.log('shutdown called') - if self._task_id: - self.log('shutting down %s' % self._task_id) - self._signal_kill_manager(driver, self._task_id, "Told to shut down executor.") - self.log('shutdown returned') http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e6cd6a9e/src/main/python/apache/aurora/executor/thermos_task_runner.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/thermos_task_runner.py b/src/main/python/apache/aurora/executor/thermos_task_runner.py index 8e18928..cd9428a 100644 --- a/src/main/python/apache/aurora/executor/thermos_task_runner.py +++ b/src/main/python/apache/aurora/executor/thermos_task_runner.py @@ -328,7 +328,10 @@ class DefaultThermosTaskRunnerProvider(TaskRunnerProvider): def from_assigned_task(self, assigned_task, sandbox): task_id = assigned_task.taskId role = assigned_task.task.owner.role - mesos_task = mesos_task_instance_from_assigned_task(assigned_task) + try: + mesos_task = mesos_task_instance_from_assigned_task(assigned_task) + except ValueError as e: + raise TaskError('Could not deserialize Thermos task from AssignedTask: %s' % e) mesos_ports = resolve_ports(mesos_task, assigned_task.assignedPorts) class ProvidedThermosTaskRunner(self._task_runner_class): http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e6cd6a9e/src/test/python/apache/aurora/executor/BUILD ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/executor/BUILD b/src/test/python/apache/aurora/executor/BUILD index 3f2387f..b1eb672 100644 --- a/src/test/python/apache/aurora/executor/BUILD +++ b/src/test/python/apache/aurora/executor/BUILD @@ -14,19 +14,37 @@ # limitations under the License. # -# TODO(jfarrell): AURORA-14 for executor_builds, gc_executor and thermos_executor python_test_suite(name = 'all', dependencies = [ - # pants(':executor_builds'), + pants(':executor-small'), + + # TODO(jfarrell): AURORA-14 for executor_builds, gc_executor and thermos_executor + # pants(':executor-large'), + ] +) + + +python_test_suite( + name = 'executor-small', + dependencies = [ pants(':executor_detector'), pants(':executor_vars'), - # pants(':gc_executor'), - # pants(':thermos_executor'), pants(':thermos_task_runner'), pants('src/test/python/apache/aurora/executor/common:all'), + ], +) + + +python_test_suite( + name = 'executor-large', + dependencies = [ + pants(':executor_builds'), + pants(':gc_executor'), + pants(':thermos_executor'), ] ) + python_tests(name = 'gc_executor', sources = ['test_gc_executor.py'], dependencies = [ @@ -42,7 +60,6 @@ python_tests(name = 'gc_executor', pants('src/main/python/apache/aurora/executor:gc_executor'), pants('src/main/thrift/org/apache/aurora/gen:py-thrift'), ], - coverage = 'twitter.mesos.executor.gc_executor' ) python_tests(name = 'executor_detector', @@ -69,7 +86,7 @@ python_tests(name = 'thermos_executor', pants('src/main/python/apache/aurora/executor/common:health_checker'), pants('src/main/python/apache/aurora/executor/common:sandbox'), pants('src/main/python/apache/aurora/executor/common:task_runner'), - pants('src/main/python/apache/aurora/executor:thermos_executor'), + pants('src/main/python/apache/aurora/executor:aurora_executor'), pants('src/main/python/apache/aurora/executor:thermos_task_runner'), pants('src/main/thrift/org/apache/aurora/gen:py-thrift'), ] http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e6cd6a9e/src/test/python/apache/aurora/executor/test_thermos_executor.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/executor/test_thermos_executor.py b/src/test/python/apache/aurora/executor/test_thermos_executor.py index 3810ea3..b508eab 100644 --- a/src/test/python/apache/aurora/executor/test_thermos_executor.py +++ b/src/test/python/apache/aurora/executor/test_thermos_executor.py @@ -42,7 +42,7 @@ from apache.aurora.executor.thermos_task_runner import ( DefaultThermosTaskRunnerProvider, ThermosTaskRunner, ) -from apache.aurora.executor.thermos_executor import ThermosExecutor +from apache.aurora.executor.aurora_executor import AuroraExecutor from apache.thermos.common.path import TaskPath from apache.thermos.core.runner import TaskRunner from apache.thermos.monitoring.monitor import TaskMonitor @@ -71,7 +71,7 @@ if 'THERMOS_DEBUG' in os.environ: log.init('executor_logger') -class FastThermosExecutor(ThermosExecutor): +class FastThermosExecutor(AuroraExecutor): STOP_WAIT = Amount(0, Time.SECONDS) @@ -132,6 +132,9 @@ class ProxyDriver(object): self.method_calls['stop'].append((args, kw)) self._stop_event.set() + def wait_stopped(self): + return self._stop_event.wait() + def make_task(thermos_config, assigned_ports={}, **kw): role = getpass.getuser() @@ -281,7 +284,7 @@ class TestThermosExecutor(object): proxy_driver = ProxyDriver() with temporary_dir() as tempdir: - te = ThermosExecutor( + te = AuroraExecutor( runner_provider=make_provider(tempdir), sandbox_provider=DefaultTestSandboxProvider) te.launchTask(proxy_driver, make_task(HELLO_WORLD_MTI)) @@ -303,7 +306,7 @@ class TestThermosExecutor(object): proxy_driver = ProxyDriver() with temporary_dir() as tempdir: - te = ThermosExecutor( + te = AuroraExecutor( runner_provider=make_provider(tempdir), sandbox_provider=DefaultTestSandboxProvider) te.launchTask(proxy_driver, make_task(MESOS_JOB(task=HELLO_WORLD), instanceId=0)) @@ -432,9 +435,7 @@ class TestThermosExecutor(object): runner_provider=runner_provider, sandbox_provider=DefaultTestSandboxProvider) te.launchTask(proxy_driver, make_task(HELLO_WORLD_MTI)) - - proxy_driver._stop_event.wait(timeout=1.0) - assert proxy_driver._stop_event.is_set() + proxy_driver.wait_stopped() updates = proxy_driver.method_calls['sendStatusUpdate'] assert updates[-1][0][0].state == mesos_pb.TASK_FAILED @@ -447,9 +448,7 @@ class TestThermosExecutor(object): runner_provider=make_provider(td), sandbox_provider=FailingSandboxProvider) te.launchTask(proxy_driver, make_task(HELLO_WORLD_MTI)) - - proxy_driver._stop_event.wait(timeout=1.0) - assert proxy_driver._stop_event.is_set() + proxy_driver.wait_stopped() updates = proxy_driver.method_calls['sendStatusUpdate'] assert updates[-1][0][0].state == mesos_pb.TASK_FAILED @@ -465,9 +464,7 @@ class TestThermosExecutor(object): sandbox_provider=SlowSandboxProvider) te.SANDBOX_INITIALIZATION_TIMEOUT = Amount(1, Time.MILLISECONDS) te.launchTask(proxy_driver, task) - - proxy_driver._stop_event.wait(timeout=1.0) - assert proxy_driver._stop_event.is_set() + proxy_driver.wait_stopped() updates = proxy_driver.method_calls['sendStatusUpdate'] assert len(updates) == 2 @@ -497,12 +494,11 @@ class TestThermosExecutor(object): # however, wait on the runner to definitely finish its initialization before continuing # (otherwise, this function races ahead too fast) te._sandbox._init_done.wait() - te.sandbox_created.wait(1.0) + te.sandbox_created.wait() assert te.sandbox_initialized.is_set() assert te.sandbox_created.is_set() - proxy_driver._stop_event.wait(timeout=1.0) - assert proxy_driver._stop_event.is_set() + proxy_driver.wait_stopped() updates = proxy_driver.method_calls['sendStatusUpdate'] assert len(updates) == 2 @@ -511,28 +507,32 @@ class TestThermosExecutor(object): def test_launchTask_deserialization_fail(self): proxy_driver = ProxyDriver() + role = getpass.getuser() task_info = mesos_pb.TaskInfo() task_info.name = task_info.task_id.value = 'broken' - task_info.data = serialize(AssignedTask(task=TaskConfig(executorConfig=ExecutorConfig( - name=AURORA_EXECUTOR_NAME, - data='garbage')))) + task_info.data = serialize( + AssignedTask( + task=TaskConfig( + owner=Identity(role=role, user=role), + executorConfig=ExecutorConfig(name=AURORA_EXECUTOR_NAME, data='garbage')))) - te = ThermosExecutor( + te = FastThermosExecutor( runner_provider=make_provider(safe_mkdtemp()), sandbox_provider=DefaultTestSandboxProvider) te.launchTask(proxy_driver, task_info) + proxy_driver.wait_stopped() updates = proxy_driver.method_calls['sendStatusUpdate'] - assert len(updates) == 1 - assert updates[0][0][0].state == mesos_pb.TASK_FAILED + assert len(updates) == 2 + assert updates[0][0][0].state == mesos_pb.TASK_STARTING + assert updates[1][0][0].state == mesos_pb.TASK_FAILED def test_waiting_executor(): proxy_driver = ProxyDriver() with temporary_dir() as checkpoint_root: - te = ThermosExecutor( + te = AuroraExecutor( runner_provider=make_provider(checkpoint_root), sandbox_provider=DefaultTestSandboxProvider) ExecutorTimeout(te.launched, proxy_driver, timeout=Amount(100, Time.MILLISECONDS)).start() - proxy_driver._stop_event.wait(timeout=1.0) - assert proxy_driver._stop_event.is_set() + proxy_driver.wait_stopped() http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e6cd6a9e/src/test/python/apache/thermos/common/BUILD ---------------------------------------------------------------------- diff --git a/src/test/python/apache/thermos/common/BUILD b/src/test/python/apache/thermos/common/BUILD index 7bbd826..14a55a0 100644 --- a/src/test/python/apache/thermos/common/BUILD +++ b/src/test/python/apache/thermos/common/BUILD @@ -27,7 +27,6 @@ python_tests(name = 'test_pathspec', dependencies = [ pants('src/main/python/apache/thermos/common:path'), ], - coverage = 'apache.thermos.base.path' ) python_tests(name = 'test_planner', @@ -35,7 +34,6 @@ python_tests(name = 'test_planner', dependencies = [ pants('src/main/python/apache/thermos/common:planner'), ], - coverage = 'apache.thermos.base.planner' ) python_tests(name = 'test_task_planner',