Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 58CC4200B8F for ; Fri, 30 Sep 2016 21:54:38 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5741A160AD9; Fri, 30 Sep 2016 19:54:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CC0EC160AB4 for ; Fri, 30 Sep 2016 21:54:36 +0200 (CEST) Received: (qmail 28430 invoked by uid 500); 30 Sep 2016 19:54:36 -0000 Mailing-List: contact commits-help@aurora.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aurora.apache.org Delivered-To: mailing list commits@aurora.apache.org Received: (qmail 28421 invoked by uid 99); 30 Sep 2016 19:54:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 30 Sep 2016 19:54:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D8EF4DFBA0; Fri, 30 Sep 2016 19:54:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jcohen@apache.org To: commits@aurora.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: aurora git commit: Modify executor state transition logic to rely on health checks (if enabled). Date: Fri, 30 Sep 2016 19:54:35 +0000 (UTC) archived-at: Fri, 30 Sep 2016 19:54:38 -0000 Repository: aurora Updated Branches: refs/heads/master 59b4d319b -> ca683cb9e Modify executor state transition logic to rely on health checks (if enabled). [Summary] Executor needs to start executing user content in STARTING and transition to RUNNING when a successful required number of health checks is reached. [Background] Please see this epic: https://issues.apache.org/jira/browse/AURORA-1225 and the design doc: https://docs.google.com/document/d/1ZdgW8S4xMhvKW7iQUX99xZm10NXSxEWR0a-21FP5d94/edit# for more details and background. [Description] If health check is enabled on vCurrent executor, the health checker will send a "TASK_RUNNING" message when a successful required number of health checks is reached within the initial_interval_secs. On the other hand, a "TASK_FAILED" message was sent if the health checker fails to reach the required number of health checks within that period, or a maximum number of failed health check limit is reached after the initital_interval_secs. If health check is disabled on the vCurrent executor, it will sends "TASK_RUNNING" message to scheduler after the thermos runner was started. In this scenario, the behavior of vCurrent executor will be the same as the vPrev executor. [Change List] The current change set includes: 1. Removed the status memoization in ChainedStatusChecker. 2. Modified the StatusManager to be edge triggered. 3. Changed the Aurora Executor callback function. 4. Modified the Health Checker and redefined the meaning initial_interval_secs. Bugs closed: AURORA-1225 Reviewed at https://reviews.apache.org/r/51876/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/ca683cb9 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/ca683cb9 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/ca683cb9 Branch: refs/heads/master Commit: ca683cb9e27bae76424a687bc6c3af5a73c501b9 Parents: 59b4d31 Author: Kai Huang Authored: Fri Sep 30 14:54:15 2016 -0500 Committer: Joshua Cohen Committed: Fri Sep 30 14:54:15 2016 -0500 ---------------------------------------------------------------------- .../apache/aurora/executor/aurora_executor.py | 46 ++++++++++---- .../aurora/executor/common/health_checker.py | 59 ++++++++++++----- .../aurora/executor/common/status_checker.py | 29 +++++---- .../apache/aurora/executor/status_manager.py | 33 ++++++++-- .../executor/common/test_health_checker.py | 67 +++++++++++++------- .../executor/common/test_status_checker.py | 37 ++++++++++- .../aurora/executor/test_status_manager.py | 28 ++++---- .../aurora/executor/test_thermos_executor.py | 44 ++++++++++--- 8 files changed, 250 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/ca683cb9/src/main/python/apache/aurora/executor/aurora_executor.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/aurora_executor.py b/src/main/python/apache/aurora/executor/aurora_executor.py index ce5ef68..d30f2d3 100644 --- a/src/main/python/apache/aurora/executor/aurora_executor.py +++ b/src/main/python/apache/aurora/executor/aurora_executor.py @@ -22,6 +22,8 @@ from twitter.common.concurrent import Timeout, deadline, defer from twitter.common.metrics import Observable from twitter.common.quantity import Amount, Time +from apache.aurora.executor.common.health_checker import HealthChecker + from .common.kill_manager import KillManager from .common.sandbox import DefaultSandboxProvider from .common.status_checker import ChainedStatusChecker @@ -94,7 +96,7 @@ class AuroraExecutor(ExecutorBase, Observable): - Set up necessary HealthCheckers - Set up StatusManager, and attach HealthCheckers """ - self.send_update(driver, self._task_id, mesos_pb2.TASK_STARTING, 'Initializing sandbox.') + self.send_update(driver, self._task_id, mesos_pb2.TASK_STARTING, 'Starting task execution.') if not self._initialize_sandbox(driver, assigned_task, mounted_volume_paths): return @@ -115,10 +117,27 @@ class AuroraExecutor(ExecutorBase, Observable): if not self._start_runner(driver, assigned_task): return - self.send_update(driver, self._task_id, mesos_pb2.TASK_RUNNING) + is_health_check_enabled = False + status_checkers = [] + try: + for status_provider in self._status_providers: + status_checker = status_provider.from_assigned_task(assigned_task, self._sandbox) + if status_checker is None: + continue + else: + if isinstance(status_checker, HealthChecker): + is_health_check_enabled = True + status_checkers.append(status_checker) + except Exception as e: + log.error(traceback.format_exc()) + self._die(driver, mesos_pb2.TASK_FAILED, "Failed to set up status checker: %s" % e) + return + + if not is_health_check_enabled: + self.send_update(driver, self._task_id, mesos_pb2.TASK_RUNNING) try: - self._start_status_manager(driver, assigned_task) + self._start_status_manager(status_checkers) except Exception: log.error(traceback.format_exc()) self._die(driver, mesos_pb2.TASK_FAILED, "Internal error") @@ -162,15 +181,9 @@ class AuroraExecutor(ExecutorBase, Observable): return True - def _start_status_manager(self, driver, assigned_task): - status_checkers = [self._kill_manager] - self.metrics.register_observable(self._kill_manager.name(), self._kill_manager) - - for status_provider in self._status_providers: - status_checker = status_provider.from_assigned_task(assigned_task, self._sandbox) - if status_checker is None: - continue - status_checkers.append(status_checker) + def _start_status_manager(self, status_checkers): + status_checkers = [self._kill_manager] + status_checkers + for status_checker in status_checkers: self.metrics.register_observable(status_checker.name(), status_checker) self._chained_checker = ChainedStatusChecker(status_checkers) @@ -178,8 +191,12 @@ class AuroraExecutor(ExecutorBase, Observable): # chain the runner to the other checkers, but do not chain .start()/.stop() complete_checker = ChainedStatusChecker([self._runner, self._chained_checker]) + self._status_manager = self._status_manager_class( - complete_checker, self._shutdown, clock=self._clock) + complete_checker, + running=self._running, + shutdown=self._shutdown, + clock=self._clock) self._status_manager.start() self.status_manager_started.set() @@ -197,6 +214,9 @@ class AuroraExecutor(ExecutorBase, Observable): self.log('Activating kill manager.') self._kill_manager.kill(reason) + def _running(self, status_result): + self.send_update(self._driver, self._task_id, status_result.status, status_result.reason) + def _shutdown(self, status_result): runner_status = self._runner.status http://git-wip-us.apache.org/repos/asf/aurora/blob/ca683cb9/src/main/python/apache/aurora/executor/common/health_checker.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/common/health_checker.py b/src/main/python/apache/aurora/executor/common/health_checker.py index 3c7c09d..03fbffd 100644 --- a/src/main/python/apache/aurora/executor/common/health_checker.py +++ b/src/main/python/apache/aurora/executor/common/health_checker.py @@ -18,7 +18,7 @@ import threading import time import traceback -from mesos.interface.mesos_pb2 import TaskState +from mesos.interface import mesos_pb2 from pystachio import Environment, String from twitter.common import log from twitter.common.exceptions import ExceptionalThread @@ -51,6 +51,7 @@ class ThreadedHealthChecker(ExceptionalThread): interval_secs, initial_interval_secs, max_consecutive_failures, + min_consecutive_successes, clock): """ :param health_checker: health checker to confirm service health @@ -59,10 +60,12 @@ class ThreadedHealthChecker(ExceptionalThread): :type sandbox: DirectorySandbox :param interval_secs: delay between checks :type interval_secs: int - :param initial_interval_secs: seconds to wait before starting checks + :param initial_interval_secs: seconds to wait before marking health check passed :type initial_interval_secs: int :param max_consecutive_failures: number of failures to allow before marking dead :type max_consecutive_failures: int + :param min_consecutive_successes: number of successes before marking health check passed + :type min_consecutive_successes: int :param clock: time module available to be mocked for testing :type clock: time module """ @@ -70,11 +73,16 @@ class ThreadedHealthChecker(ExceptionalThread): self.sandbox = sandbox self.clock = clock self.current_consecutive_failures = 0 + self.current_consecutive_successes = 0 self.dead = threading.Event() self.interval = interval_secs self.max_consecutive_failures = max_consecutive_failures + self.min_consecutive_successes = min_consecutive_successes self.snooze_file = None self.snoozed = False + self._expired = False + self.start_time = 0 + self.health_check_passed = False if self.sandbox and self.sandbox.exists(): self.snooze_file = os.path.join(self.sandbox.root, '.healthchecksnooze') @@ -84,10 +92,8 @@ class ThreadedHealthChecker(ExceptionalThread): else: self.initial_interval = interval_secs * 2 - if self.initial_interval > 0: - self.healthy, self.reason = True, None - else: - self.healthy, self.reason = self._perform_check_if_not_disabled() + self.healthy, self.reason = True, None + super(ThreadedHealthChecker, self).__init__() self.daemon = True @@ -110,21 +116,35 @@ class ThreadedHealthChecker(ExceptionalThread): def _maybe_update_failure_count(self, is_healthy, reason): if not is_healthy: log.warning('Health check failure: %s' % reason) + self.reason = reason + if self.current_consecutive_successes > 0: + log.debug('Reset consecutive successes counter.') + self.current_consecutive_successes = 0 self.current_consecutive_failures += 1 - if self.current_consecutive_failures > self.max_consecutive_failures: - log.warning('Reached consecutive failure limit.') - self.healthy = False - self.reason = reason else: if self.current_consecutive_failures > 0: log.debug('Reset consecutive failures counter.') self.current_consecutive_failures = 0 + self.current_consecutive_successes += 1 + + if not self._expired: + if self.clock.time() - self.start_time > self.initial_interval: + log.debug('Initial interval expired.') + self._expired = True + if not self.health_check_passed: + log.warning('Failed to reach minimum consecutive successes.') + self.healthy = False + else: + if self.current_consecutive_successes >= self.min_consecutive_successes: + log.info('Reached minimum consecutive successes.') + self.health_check_passed = True + + if self._expired and self.healthy: + self.healthy = self.current_consecutive_failures <= self.max_consecutive_failures def run(self): log.debug('Health checker thread started.') - if self.initial_interval > 0: - self.clock.sleep(self.initial_interval) - log.debug('Initial interval expired.') + self.start_time = self.clock.time() while not self.dead.is_set(): is_healthy, reason = self._perform_check_if_not_disabled() self._maybe_update_failure_count(is_healthy, reason) @@ -148,6 +168,8 @@ class HealthChecker(StatusChecker): Exported metrics: health_checker.consecutive_failures: Number of consecutive failures observed. Resets to zero on successful health check. + health_checker.consecutive_successes: Number of consecutive successes observed. Resets + to zero on failed health check. health_checker.snoozed: Returns 1 if the health checker is snoozed, 0 if not. health_checker.total_latency_secs: Total time waiting for the health checker to respond in seconds. To get average latency, use health_checker.total_latency / health_checker.checks. @@ -160,6 +182,7 @@ class HealthChecker(StatusChecker): interval_secs=10, initial_interval_secs=None, max_consecutive_failures=0, + min_consecutive_successes=1, clock=time): self._health_checks = 0 self._total_latency = 0 @@ -171,9 +194,12 @@ class HealthChecker(StatusChecker): interval_secs, initial_interval_secs, max_consecutive_failures, + min_consecutive_successes, clock) self.metrics.register(LambdaGauge('consecutive_failures', lambda: self.threaded_health_checker.current_consecutive_failures)) + self.metrics.register(LambdaGauge('consecutive_successes', + lambda: self.threaded_health_checker.current_consecutive_successes)) self.metrics.register(LambdaGauge('snoozed', lambda: int(self.threaded_health_checker.snoozed))) self.metrics.register(LambdaGauge('total_latency_secs', lambda: self._total_latency)) self.metrics.register(LambdaGauge('checks', lambda: self._health_checks)) @@ -192,9 +218,12 @@ class HealthChecker(StatusChecker): @property def status(self): - if not self.threaded_health_checker.healthy: + if self.threaded_health_checker.healthy: + if self.threaded_health_checker.health_check_passed: + return StatusResult('Task is healthy.', mesos_pb2.TASK_RUNNING) + else: return StatusResult('Failed health check! %s' % self.threaded_health_checker.reason, - TaskState.Value('TASK_FAILED')) + mesos_pb2.TASK_FAILED) def name(self): return 'health_checker' http://git-wip-us.apache.org/repos/asf/aurora/blob/ca683cb9/src/main/python/apache/aurora/executor/common/status_checker.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/common/status_checker.py b/src/main/python/apache/aurora/executor/common/status_checker.py index 795dae2..fccb69a 100644 --- a/src/main/python/apache/aurora/executor/common/status_checker.py +++ b/src/main/python/apache/aurora/executor/common/status_checker.py @@ -14,7 +14,7 @@ from abc import abstractmethod, abstractproperty -from mesos.interface.mesos_pb2 import TaskState +from mesos.interface import mesos_pb2 from twitter.common import log from twitter.common.lang import Interface from twitter.common.metrics import Observable @@ -31,7 +31,7 @@ class StatusResult(object): def __init__(self, reason, status): self._reason = reason - if status not in TaskState.values(): + if status not in mesos_pb2.TaskState.values(): raise ValueError('Unknown task state: %r' % status) self._status = status @@ -47,7 +47,7 @@ class StatusResult(object): return '%s(%r, status=%r)' % ( self.__class__.__name__, self._reason, - TaskState.Name(self._status)) + mesos_pb2.TaskState.Name(self._status)) class StatusChecker(Observable, Interface): @@ -85,24 +85,25 @@ class Healthy(StatusChecker): class ChainedStatusChecker(StatusChecker): def __init__(self, status_checkers): self._status_checkers = status_checkers - self._status = None if not all(isinstance(h_i, StatusChecker) for h_i in status_checkers): raise TypeError('ChainedStatusChecker must take an iterable of StatusCheckers.') super(ChainedStatusChecker, self).__init__() @property def status(self): - if self._status is None: - for status_checker in self._status_checkers: - status_checker_status = status_checker.status - if status_checker_status is not None: - log.info('%s reported %s' % (status_checker.__class__.__name__, status_checker_status)) - if not isinstance(status_checker_status, StatusResult): - raise TypeError('StatusChecker returned something other than a StatusResult: got %s' % - type(status_checker_status)) - self._status = status_checker_status + # Do not memoize the status so that faluires occur in RUNNING state won't be masked. + status = None + for status_checker in self._status_checkers: + status_checker_status = status_checker.status + if status_checker_status is not None: + log.info('%s reported %s' % (status_checker.__class__.__name__, status_checker_status)) + if not isinstance(status_checker_status, StatusResult): + raise TypeError('StatusChecker returned something other than a StatusResult: got %s' % + type(status_checker_status)) + status = status_checker_status + if status_checker_status.status is not mesos_pb2.TASK_RUNNING: break - return self._status + return status def start(self): for status_checker in self._status_checkers: http://git-wip-us.apache.org/repos/asf/aurora/blob/ca683cb9/src/main/python/apache/aurora/executor/status_manager.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/status_manager.py b/src/main/python/apache/aurora/executor/status_manager.py index 228a99a..26aea23 100644 --- a/src/main/python/apache/aurora/executor/status_manager.py +++ b/src/main/python/apache/aurora/executor/status_manager.py @@ -14,6 +14,7 @@ import time +from mesos.interface import mesos_pb2 from twitter.common import log from twitter.common.exceptions import ExceptionalThread from twitter.common.quantity import Amount, Time @@ -31,23 +32,43 @@ class StatusManager(ExceptionalThread): """ POLL_WAIT = Amount(500, Time.MILLISECONDS) - def __init__(self, status_checker, callback, clock=time): + def __init__(self, status_checker, running, shutdown, clock=time): + """ + :param status_checker: status checker to check the task status + :type status_checker: ChainedStatusChecker + :param running: callback function that handles the RUNNING task status + :type running: callable + :param shutdown: callback function that handles the terminal task status + :type shutdown: callable + :param clock: time module available to be mocked for testing + :type clock: time modulet + """ if not isinstance(status_checker, StatusChecker): raise TypeError('status_checker must be a StatusChecker, got %s' % type(status_checker)) - if not callable(callback): - raise TypeError('callback needs to be callable!') + if not callable(running): + raise TypeError('running callback needs to be callable!') + if not callable(shutdown): + raise TypeError('shutdown callback needs to be callable!') self._status_checker = status_checker - self._callback = callback + self._running = running + self._shutdown = shutdown self._clock = clock super(StatusManager, self).__init__() self.daemon = True + self.is_task_running = False def run(self): while True: status_result = self._status_checker.status if status_result is not None: log.info('Status manager got %s' % status_result) - self._callback(status_result) - break + if status_result.status is mesos_pb2.TASK_RUNNING: + if not self.is_task_running: + self._running(status_result) + self.is_task_running = True + self._clock.sleep(self.POLL_WAIT.as_(Time.SECONDS)) + else: + self._shutdown(status_result) + break else: self._clock.sleep(self.POLL_WAIT.as_(Time.SECONDS)) http://git-wip-us.apache.org/repos/asf/aurora/blob/ca683cb9/src/test/python/apache/aurora/executor/common/test_health_checker.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/executor/common/test_health_checker.py b/src/test/python/apache/aurora/executor/common/test_health_checker.py index da0c56c..28769dc 100644 --- a/src/test/python/apache/aurora/executor/common/test_health_checker.py +++ b/src/test/python/apache/aurora/executor/common/test_health_checker.py @@ -20,7 +20,7 @@ import unittest import mock import pytest -from mesos.interface.mesos_pb2 import TaskState +from mesos.interface import mesos_pb2 from twitter.common.exceptions import ExceptionalThread from twitter.common.testing.clock import ThreadedClock @@ -59,23 +59,28 @@ class TestHealthChecker(unittest.TestCase): self.fake_health_checks.append((status, 'reason')) def test_initial_interval_2x(self): - self.append_health_checks(False) + self.append_health_checks(False, 2) + self.append_health_checks(True, 1) + self.append_health_checks(False, 1) hct = HealthChecker(self._checker.health, interval_secs=5, clock=self._clock) hct.start() assert self._clock.converge(threads=[hct.threaded_health_checker]) - self._clock.assert_waiting(hct.threaded_health_checker, 10) + self._clock.assert_waiting(hct.threaded_health_checker, amount=5) assert hct.status is None - self._clock.tick(6) + self._clock.tick(5) assert self._clock.converge(threads=[hct.threaded_health_checker]) + self._clock.assert_waiting(hct.threaded_health_checker, amount=5) assert hct.status is None - self._clock.tick(3) + self._clock.tick(5) assert self._clock.converge(threads=[hct.threaded_health_checker]) - assert hct.status is None + self._clock.assert_waiting(hct.threaded_health_checker, amount=5) + assert hct.status.status == mesos_pb2.TASK_RUNNING self._clock.tick(5) assert self._clock.converge(threads=[hct.threaded_health_checker]) - assert hct.status.status == TaskState.Value('TASK_FAILED') + self._clock.assert_waiting(hct.threaded_health_checker, amount=5) + assert hct.status.status == mesos_pb2.TASK_FAILED hct.stop() - assert self._checker.health.call_count == 1 + assert self._checker.health.call_count == 4 def test_initial_interval_whatev(self): self.append_health_checks(False, 2) @@ -87,7 +92,11 @@ class TestHealthChecker(unittest.TestCase): hct.start() self._clock.converge(threads=[hct.threaded_health_checker]) self._clock.assert_waiting(hct.threaded_health_checker, amount=5) - assert hct.status.status == TaskState.Value('TASK_FAILED') + assert hct.status is None + self._clock.tick(5) + self._clock.converge(threads=[hct.threaded_health_checker]) + self._clock.assert_waiting(hct.threaded_health_checker, amount=5) + assert hct.status.status == mesos_pb2.TASK_FAILED hct.stop() # this is an implementation detail -- we healthcheck in the initializer and # healthcheck in the run loop. if we ever change the implementation, expect @@ -111,38 +120,36 @@ class TestHealthChecker(unittest.TestCase): self._clock.converge(threads=[hct.threaded_health_checker]) # 2 consecutive health check failures followed by a successful health check. - epsilon = 0.001 - self._clock.tick(initial_interval_secs + epsilon) self._clock.converge(threads=[hct.threaded_health_checker]) self._clock.assert_waiting(hct.threaded_health_checker, amount=1) assert hct.status is None assert hct.metrics.sample()['consecutive_failures'] == 1 - self._clock.tick(interval_secs + epsilon) + self._clock.tick(interval_secs) self._clock.converge(threads=[hct.threaded_health_checker]) self._clock.assert_waiting(hct.threaded_health_checker, amount=1) assert hct.status is None assert hct.metrics.sample()['consecutive_failures'] == 2 - self._clock.tick(interval_secs + epsilon) + self._clock.tick(interval_secs) self._clock.converge(threads=[hct.threaded_health_checker]) self._clock.assert_waiting(hct.threaded_health_checker, amount=1) - assert hct.status is None + assert hct.status.status == mesos_pb2.TASK_RUNNING assert hct.metrics.sample()['consecutive_failures'] == 0 # 3 consecutive health check failures. - self._clock.tick(interval_secs + epsilon) + self._clock.tick(interval_secs) self._clock.converge(threads=[hct.threaded_health_checker]) self._clock.assert_waiting(hct.threaded_health_checker, amount=1) - assert hct.status is None + assert hct.status.status == mesos_pb2.TASK_RUNNING assert hct.metrics.sample()['consecutive_failures'] == 1 - self._clock.tick(interval_secs + epsilon) + self._clock.tick(interval_secs) self._clock.converge(threads=[hct.threaded_health_checker]) self._clock.assert_waiting(hct.threaded_health_checker, amount=1) - assert hct.status is None + assert hct.status.status == mesos_pb2.TASK_RUNNING assert hct.metrics.sample()['consecutive_failures'] == 2 - self._clock.tick(interval_secs + epsilon) + self._clock.tick(interval_secs) self._clock.converge(threads=[hct.threaded_health_checker]) self._clock.assert_waiting(hct.threaded_health_checker, amount=1) - assert hct.status.status == TaskState.Value('TASK_FAILED') + assert hct.status.status == mesos_pb2.TASK_FAILED assert hct.metrics.sample()['consecutive_failures'] == 3 hct.stop() assert self._checker.health.call_count == 6 @@ -439,9 +446,10 @@ class TestThreadedHealthChecker(unittest.TestCase): self.sandbox.exists.return_value = True self.sandbox.root = '/root' - self.initial_interval_secs = 1 - self.interval_secs = 5 + self.initial_interval_secs = 2 + self.interval_secs = 1 self.max_consecutive_failures = 2 + self.min_consecutive_successes = 1 self.clock = mock.Mock(spec=time) self.clock.time.return_value = 1.0 self.health_checker = HealthChecker( @@ -450,6 +458,7 @@ class TestThreadedHealthChecker(unittest.TestCase): self.interval_secs, self.initial_interval_secs, self.max_consecutive_failures, + self.min_consecutive_successes, self.clock) self.health_checker_sandbox_exists = HealthChecker( self.health, @@ -457,6 +466,7 @@ class TestThreadedHealthChecker(unittest.TestCase): self.interval_secs, self.initial_interval_secs, self.max_consecutive_failures, + self.min_consecutive_successes, self.clock) def test_perform_check_if_not_disabled_snooze_file_is_none(self): @@ -491,17 +501,28 @@ class TestThreadedHealthChecker(unittest.TestCase): hc = self.health_checker.threaded_health_checker assert hc.current_consecutive_failures == 0 + assert hc.current_consecutive_successes == 0 + assert hc.healthy is True + + hc._maybe_update_failure_count(False, 'reason') + assert hc.current_consecutive_failures == 1 + assert hc.current_consecutive_successes == 0 assert hc.healthy is True hc._maybe_update_failure_count(True, 'reason') assert hc.current_consecutive_failures == 0 + assert hc.current_consecutive_successes == 1 + assert hc.healthy is True + hc._expired = True hc._maybe_update_failure_count(False, 'reason') assert hc.current_consecutive_failures == 1 + assert hc.current_consecutive_successes == 0 assert hc.healthy is True hc._maybe_update_failure_count(False, 'reason') assert hc.current_consecutive_failures == 2 + assert hc.current_consecutive_successes == 0 assert hc.healthy is True hc._maybe_update_failure_count(False, 'reason') @@ -517,7 +538,7 @@ class TestThreadedHealthChecker(unittest.TestCase): liveness = [False, False, True] self.health_checker.threaded_health_checker.dead.is_set.side_effect = lambda: liveness.pop(0) self.health_checker.threaded_health_checker.run() - assert self.clock.sleep.call_count == 3 + assert self.clock.sleep.call_count == 2 assert mock_maybe_update_failure_count.call_count == 2 @mock.patch('apache.aurora.executor.common.health_checker.ExceptionalThread.start', http://git-wip-us.apache.org/repos/asf/aurora/blob/ca683cb9/src/test/python/apache/aurora/executor/common/test_status_checker.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/executor/common/test_status_checker.py b/src/test/python/apache/aurora/executor/common/test_status_checker.py index 5be1981..8942639 100644 --- a/src/test/python/apache/aurora/executor/common/test_status_checker.py +++ b/src/test/python/apache/aurora/executor/common/test_status_checker.py @@ -14,7 +14,7 @@ import threading -from mesos.interface.mesos_pb2 import TaskState +from mesos.interface import mesos_pb2 from apache.aurora.executor.common.status_checker import ( ChainedStatusChecker, @@ -62,11 +62,42 @@ def test_chained_health_interface(): assert si.started.is_set() assert chained_si.status is None - reason = StatusResult('derp', TaskState.Value('TASK_FAILED')) + reason = StatusResult('derp', mesos_pb2.TASK_FAILED) si2.set_status(reason) assert chained_si.status == reason assert chained_si.status.reason == 'derp' - assert TaskState.Name(chained_si.status.status) == 'TASK_FAILED' + assert mesos_pb2.TaskState.Name(chained_si.status.status) == 'TASK_FAILED' + + for si in (si1, si2): + assert not si.stopped.is_set() + chained_si.stop() + for si in (si1, si2): + assert si.stopped.is_set() + + # A task may fail after transition into RUNNING state. We need to test + # the status is not memoized in ChainedStatusChecker. + si1 = EventHealth() + si2 = EventHealth() + chained_si = ChainedStatusChecker([si1, si2]) + + for si in (si1, si2): + assert not si.started.is_set() + chained_si.start() + for si in (si1, si2): + assert si.started.is_set() + + assert chained_si.status is None + reason2 = StatusResult('Task is healthy.', mesos_pb2.TASK_RUNNING) + si2.set_status(reason2) + assert chained_si.status == reason2 + assert chained_si.status.reason == 'Task is healthy.' + assert mesos_pb2.TaskState.Name(chained_si.status.status) == 'TASK_RUNNING' + + reason1 = StatusResult('derp', mesos_pb2.TASK_FAILED) + si1.set_status(reason1) + assert chained_si.status == reason1 + assert chained_si.status.reason == 'derp' + assert mesos_pb2.TaskState.Name(chained_si.status.status) == 'TASK_FAILED' for si in (si1, si2): assert not si.stopped.is_set() http://git-wip-us.apache.org/repos/asf/aurora/blob/ca683cb9/src/test/python/apache/aurora/executor/test_status_manager.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/executor/test_status_manager.py b/src/test/python/apache/aurora/executor/test_status_manager.py index ce4679b..7c0efe8 100644 --- a/src/test/python/apache/aurora/executor/test_status_manager.py +++ b/src/test/python/apache/aurora/executor/test_status_manager.py @@ -16,9 +16,9 @@ import time from unittest import TestCase import mock -from mesos.interface.mesos_pb2 import TaskState +from mesos.interface import mesos_pb2 -from apache.aurora.executor.common.status_checker import StatusChecker +from apache.aurora.executor.common.status_checker import StatusChecker, StatusResult from apache.aurora.executor.status_manager import StatusManager @@ -28,23 +28,29 @@ class FakeStatusChecker(StatusChecker): @property def status(self): + status_result = None + if self.call_count == 1: + status_result = StatusResult('Task is healthy.', mesos_pb2.TASK_RUNNING) if self.call_count == 2: - return TaskState.Value('TASK_KILLED') + status_result = StatusResult('Task is healthy.', mesos_pb2.TASK_RUNNING) + if self.call_count == 3: + status_result = StatusResult('Reason why a task failed.', mesos_pb2.TASK_KILLED) self.call_count += 1 - return None + return status_result class TestStatusManager(TestCase): def setUp(self): - self.callback_called = False + self.callback_call_count = 0 def test_run(self): checker = FakeStatusChecker() - def callback(result): - assert result == TaskState.Value('TASK_KILLED') - self.callback_called = True + def running(result): + self.callback_call_count += 1 + def shutdown(result): + self.callback_call_count += 1 mock_time = mock.create_autospec(spec=time, instance=True) - status_manager = StatusManager(checker, callback, mock_time) + status_manager = StatusManager(checker, running, shutdown, mock_time) status_manager.run() - assert mock_time.sleep.call_count == 2 - assert self.callback_called is True + assert mock_time.sleep.call_count == 3 + assert self.callback_call_count == 2 http://git-wip-us.apache.org/repos/asf/aurora/blob/ca683cb9/src/test/python/apache/aurora/executor/test_thermos_executor.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/executor/test_thermos_executor.py b/src/test/python/apache/aurora/executor/test_thermos_executor.py index 0bfe9e9..ac914db 100644 --- a/src/test/python/apache/aurora/executor/test_thermos_executor.py +++ b/src/test/python/apache/aurora/executor/test_thermos_executor.py @@ -44,9 +44,9 @@ from apache.aurora.config.schema.base import ( ) from apache.aurora.executor.aurora_executor import AuroraExecutor from apache.aurora.executor.common.executor_timeout import ExecutorTimeout -from apache.aurora.executor.common.health_checker import HealthCheckerProvider +from apache.aurora.executor.common.health_checker import HealthChecker, HealthCheckerProvider from apache.aurora.executor.common.sandbox import DirectorySandbox, SandboxProvider -from apache.aurora.executor.common.status_checker import ChainedStatusChecker +from apache.aurora.executor.common.status_checker import ChainedStatusChecker, StatusCheckerProvider from apache.aurora.executor.common.task_runner import TaskError from apache.aurora.executor.status_manager import StatusManager from apache.aurora.executor.thermos_task_runner import ( @@ -100,6 +100,14 @@ class FailingSandboxProvider(SandboxProvider): return FailingSandbox(safe_mkdtemp(), exception_type=self._exception_type, **kwargs) +class FailingStatusCheckerProvider(StatusCheckerProvider): + def __init__(self, exception_type): + self._exception_type = exception_type + + def from_assigned_task(self, assigned_task, sandbox): + raise self._exception_type('Could not create status checker!') + + class SlowSandbox(DirectorySandbox): def __init__(self, *args, **kwargs): super(SlowSandbox, self).__init__(*args, **kwargs) @@ -217,18 +225,25 @@ def make_executor( ExecutorTimeout(te.launched, proxy_driver, timeout=Amount(100, Time.MILLISECONDS)).start() task_description = make_task(task, assigned_ports=ports, instanceId=0) te.launchTask(proxy_driver, task_description) - te.status_manager_started.wait() - while len(proxy_driver.method_calls['sendStatusUpdate']) < 2: + is_health_check_enabled = False + for status_checker in te._chained_checker._status_checkers: + if isinstance(status_checker, HealthChecker): + is_health_check_enabled = True + break + + status_update_nums = 1 if is_health_check_enabled else 2 + while len(proxy_driver.method_calls['sendStatusUpdate']) < status_update_nums: time.sleep(0.1) # make sure startup was kosher updates = proxy_driver.method_calls['sendStatusUpdate'] - assert len(updates) == 2 + assert len(updates) == status_update_nums status_updates = [arg_tuple[0][0] for arg_tuple in updates] assert status_updates[0].state == mesos_pb2.TASK_STARTING - assert status_updates[1].state == mesos_pb2.TASK_RUNNING + if not is_health_check_enabled: + assert status_updates[1].state == mesos_pb2.TASK_RUNNING # wait for the runner to bind to a task while True: @@ -236,7 +251,6 @@ def make_executor( if runner: break time.sleep(0.1) - assert te.launched.is_set() return runner, te @@ -438,7 +452,7 @@ class TestThermosExecutor(object): executor.terminated.wait() updates = proxy_driver.method_calls['sendStatusUpdate'] - assert len(updates) == 3 + assert len(updates) == 2 assert updates[-1][0][0].state == mesos_pb2.TASK_FAILED def test_task_health_ok(self): @@ -572,6 +586,20 @@ class TestThermosExecutor(object): assert updates[0][0][0].state == mesos_pb2.TASK_STARTING assert updates[1][0][0].state == mesos_pb2.TASK_FAILED + def test_failing_status_provider_initialize_unknown_exception(self): + proxy_driver = ProxyDriver() + + with temporary_dir() as td: + te = FastThermosExecutor( + runner_provider=make_provider(td), + sandbox_provider=DefaultTestSandboxProvider(), + status_providers=(FailingStatusCheckerProvider(exception_type=Exception),)) + te.launchTask(proxy_driver, make_task(HELLO_WORLD_MTI)) + proxy_driver.wait_stopped() + + updates = proxy_driver.method_calls['sendStatusUpdate'] + assert updates[-1][0][0].state == mesos_pb2.TASK_FAILED + def test_waiting_executor(): proxy_driver = ProxyDriver()